• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3530

16 Nov 2024 07:44AM UTC coverage: 60.219% (-0.7%) from 60.888%
#3530

push

travis-ci

web-flow
Update 03-ad.md

118417 of 252124 branches covered (46.97%)

Branch coverage included in aggregate %.

198982 of 274951 relevant lines covered (72.37%)

6072359.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.32
/source/libs/executor/src/streamfilloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "tmsg.h"
21
#include "ttypes.h"
22

23
#include "executorInt.h"
24
#include "streamexecutorInt.h"
25
#include "tcommon.h"
26
#include "thash.h"
27
#include "ttime.h"
28

29
#include "function.h"
30
#include "operator.h"
31
#include "querynodes.h"
32
#include "querytask.h"
33
#include "tdatablock.h"
34
#include "tfill.h"
35

36
#define FILL_POS_INVALID 0
37
#define FILL_POS_START   1
38
#define FILL_POS_MID     2
39
#define FILL_POS_END     3
40

41
typedef struct STimeRange {
42
  TSKEY    skey;
43
  TSKEY    ekey;
44
  uint64_t groupId;
45
} STimeRange;
46

47
TSKEY getNextWindowTs(TSKEY ts, SInterval* pInterval) {
31✔
48
  STimeWindow win = {.skey = ts, .ekey = ts};
31✔
49
  getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
31✔
50
  return win.skey;
31✔
51
}
52

53
TSKEY getPrevWindowTs(TSKEY ts, SInterval* pInterval) {
15✔
54
  STimeWindow win = {.skey = ts, .ekey = ts};
15✔
55
  getNextTimeWindow(pInterval, &win, TSDB_ORDER_DESC);
15✔
56
  return win.skey;
15✔
57
}
58

59
int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell) {
72,197✔
60
  return colDataSetVal(pCol, rowId, pCell->pData, pCell->isNull);
72,197✔
61
}
62

63
SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index) {
131,032✔
64
  if (!pRaw || !pRaw->pRowVal) {
131,032!
65
    return NULL;
×
66
  }
67
  char*            pData = (char*)pRaw->pRowVal;
131,086✔
68
  SResultCellData* pCell = pRaw->pRowVal;
131,086✔
69
  for (int32_t i = 0; i < index; i++) {
808,943✔
70
    pData += (pCell->bytes + sizeof(SResultCellData));
677,857✔
71
    pCell = (SResultCellData*)pData;
677,857✔
72
  }
73
  return pCell;
131,086✔
74
}
75

76
void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) {
238✔
77
  for (int32_t i = start; i < end; i++) {
490✔
78
    destroyExprInfo(pFillCol[i].pExpr, 1);
252✔
79
    taosVariantDestroy(&pFillCol[i].fillVal);
252✔
80
  }
81
  if (start < end) {
238✔
82
    taosMemoryFreeClear(pFillCol[start].pExpr);
227!
83
  }
84
  taosMemoryFree(pFillCol);
238✔
85
  return NULL;
238✔
86
}
87

88
void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
238✔
89
  if (pFillSup == NULL) {
238!
90
    return;
×
91
  }
92
  pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
238✔
93
  tSimpleHashCleanup(pFillSup->pResMap);
238✔
94
  pFillSup->pResMap = NULL;
238✔
95
  cleanupExprSupp(&pFillSup->notFillExprSup);
238✔
96
  if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
238✔
97
    taosMemoryFree(pFillSup->cur.pRowVal);
108✔
98
  }
99
  taosMemoryFree(pFillSup->prev.pRowVal);
238✔
100
  taosMemoryFree(pFillSup->next.pRowVal);
238✔
101
  taosMemoryFree(pFillSup->nextNext.pRowVal);
238✔
102

103
  taosMemoryFree(pFillSup);
238✔
104
}
105

106
void destroySPoint(void* ptr) {
20,200✔
107
  SPoint* point = (SPoint*)ptr;
20,200✔
108
  taosMemoryFreeClear(point->val);
20,200!
109
}
20,200✔
110

111
void destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
238✔
112
  taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint);
238✔
113
  taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint);
238✔
114
  taosMemoryFree(pFillLinear);
238✔
115
}
238✔
116

117
void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
238✔
118
  if (pFillInfo == NULL) {
238!
119
    return;
×
120
  } 
121
  if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F ||
238✔
122
      pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
193✔
123
    taosMemoryFreeClear(pFillInfo->pResRow->pRowVal);
105!
124
    taosMemoryFreeClear(pFillInfo->pResRow);
105!
125
    taosMemoryFreeClear(pFillInfo->pNonFillRow->pRowVal);
105!
126
    taosMemoryFreeClear(pFillInfo->pNonFillRow);
105!
127
  }
128
  destroyStreamFillLinearInfo(pFillInfo->pLinearInfo);
238✔
129
  pFillInfo->pLinearInfo = NULL;
238✔
130

131
  taosArrayDestroy(pFillInfo->delRanges);
238✔
132
  taosMemoryFree(pFillInfo);
238✔
133
}
134

135
static void destroyStreamFillOperatorInfo(void* param) {
227✔
136
  SStreamFillOperatorInfo* pInfo = (SStreamFillOperatorInfo*)param;
227✔
137
  destroyStreamFillInfo(pInfo->pFillInfo);
227✔
138
  destroyStreamFillSupporter(pInfo->pFillSup);
227✔
139
  blockDataDestroy(pInfo->pRes);
227✔
140
  pInfo->pRes = NULL;
227✔
141
  blockDataDestroy(pInfo->pSrcBlock);
227✔
142
  pInfo->pSrcBlock = NULL;
227✔
143
  blockDataDestroy(pInfo->pDelRes);
227✔
144
  pInfo->pDelRes = NULL;
227✔
145
  taosArrayDestroy(pInfo->matchInfo.pList);
227✔
146
  pInfo->matchInfo.pList = NULL;
227✔
147
  taosArrayDestroy(pInfo->pUpdated);
227✔
148
  clearGroupResInfo(&pInfo->groupResInfo);
227✔
149
  taosArrayDestroy(pInfo->pCloseTs);
227✔
150

151
  taosMemoryFree(pInfo);
227✔
152
}
227✔
153

154
static void resetFillWindow(SResultRowData* pRowData) {
6,143✔
155
  pRowData->key = INT64_MIN;
6,143✔
156
  taosMemoryFreeClear(pRowData->pRowVal);
6,143✔
157
}
6,143✔
158

159
static void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) {
1,343✔
160
  if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
1,343✔
161
    resetFillWindow(&pFillSup->cur);
1,024✔
162
  } else {
163
    pFillSup->cur.key = INT64_MIN;
319✔
164
    pFillSup->cur.pRowVal = NULL;
319✔
165
  }
166
  resetFillWindow(&pFillSup->prev);
1,343✔
167
  resetFillWindow(&pFillSup->next);
1,343✔
168
  resetFillWindow(&pFillSup->nextNext);
1,343✔
169
}
1,343✔
170

171
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
1,343✔
172
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
1,343✔
173
  void*        pState = pOperator->pTaskInfo->streamInfo.pState;
1,343✔
174
  resetPrevAndNextWindow(pFillSup);
1,343✔
175

176
  SWinKey key = {.ts = ts, .groupId = groupId};
1,343✔
177
  void*   curVal = NULL;
1,343✔
178
  int32_t curVLen = 0;
1,343✔
179
  bool    hasCurKey = true;
1,343✔
180
  int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen, NULL);
1,343✔
181
  if (code == TSDB_CODE_SUCCESS) {
1,343✔
182
    pFillSup->cur.key = key.ts;
1,303✔
183
    pFillSup->cur.pRowVal = curVal;
1,303✔
184
  } else {
185
    qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId);
40!
186
    pFillSup->cur.key = ts;
40✔
187
    pFillSup->cur.pRowVal = NULL;
40✔
188
    hasCurKey = false;
40✔
189
  }
190

191
  SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key);
1,343✔
192
  SWinKey          preKey = {.ts = INT64_MIN, .groupId = groupId};
1,343✔
193
  void*            preVal = NULL;
1,343✔
194
  int32_t          preVLen = 0;
1,343✔
195
  code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
1,343✔
196

197
  if (code == TSDB_CODE_SUCCESS) {
1,343✔
198
    pFillSup->prev.key = preKey.ts;
1,038✔
199
    pFillSup->prev.pRowVal = preVal;
1,038✔
200

201
    if (hasCurKey) {
1,038✔
202
      pAPI->stateStore.streamStateCurNext(pState, pCur);
998✔
203
    }
204

205
    pAPI->stateStore.streamStateCurNext(pState, pCur);
1,038✔
206
  } else {
207
    pAPI->stateStore.streamStateFreeCur(pCur);
305✔
208
    pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
305✔
209
  }
210

211
  SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId};
1,343✔
212
  void*   nextVal = NULL;
1,343✔
213
  int32_t nextVLen = 0;
1,343✔
214
  code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen);
1,343✔
215
  if (code == TSDB_CODE_SUCCESS) {
1,343✔
216
    pFillSup->next.key = nextKey.ts;
633✔
217
    pFillSup->next.pRowVal = nextVal;
633✔
218
    if (pFillSup->type == TSDB_FILL_PREV || pFillSup->type == TSDB_FILL_NEXT) {
633✔
219
      pAPI->stateStore.streamStateCurNext(pState, pCur);
247✔
220
      SWinKey nextNextKey = {.groupId = groupId};
247✔
221
      void*   nextNextVal = NULL;
247✔
222
      int32_t nextNextVLen = 0;
247✔
223
      code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen);
247✔
224
      if (code == TSDB_CODE_SUCCESS) {
247✔
225
        pFillSup->nextNext.key = nextNextKey.ts;
123✔
226
        pFillSup->nextNext.pRowVal = nextNextVal;
123✔
227
      }
228
    }
229
  }
230
  pAPI->stateStore.streamStateFreeCur(pCur);
1,343✔
231
}
1,343✔
232

233
bool hasCurWindow(SStreamFillSupporter* pFillSup) { return pFillSup->cur.key != INT64_MIN; }
×
234
bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; }
4,211✔
235
bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; }
2,584✔
236
static bool hasNextNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->nextNext.key != INT64_MIN; }
81✔
237

238
static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) {
1,312✔
239
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1,312✔
240
  for (int32_t i = 0; i < numOfCols; ++i) {
20,735✔
241
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
19,426✔
242
    SResultCellData* pCell = getResultCell(pRowVal, i);
19,426✔
243
    if (!colDataIsNull_s(pColData, rowId)) {
38,846!
244
      pCell->isNull = false;
19,425✔
245
      pCell->type = pColData->info.type;
19,425✔
246
      pCell->bytes = pColData->info.bytes;
19,425✔
247
      char* val = colDataGetData(pColData, rowId);
19,425!
248
      if (IS_VAR_DATA_TYPE(pCell->type)) {
19,425!
249
        memcpy(pCell->pData, val, varDataTLen(val));
38✔
250
      } else {
251
        memcpy(pCell->pData, val, pCell->bytes);
19,387✔
252
      }
253
    } else {
254
      pCell->isNull = true;
×
255
    }
256
  }
257
  pRowVal->key = ts;
1,309✔
258
}
1,309✔
259

260
static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) {
219✔
261
  for (int32_t i = 0; i < numOfCol; i++) {
3,466✔
262
    if (!pFillCol[i].notFillCol) {
3,247✔
263
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol + i);
3,028✔
264
      SResultCellData* pECell = getResultCell(pEndRow, slotId);
3,028✔
265
      SPoint*          pPoint = taosArrayGet(pEndPoins, slotId);
3,028✔
266
      pPoint->key = pEndRow->key;
3,028✔
267
      memcpy(pPoint->val, pECell->pData, pECell->bytes);
3,028✔
268
    }
269
  }
270
}
219✔
271

272
static void setFillInfoStart(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,033✔
273
  ts = taosTimeAdd(ts, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
1,033✔
274
  pFillInfo->start = ts;
1,033✔
275
}
1,033✔
276

277
static void setFillInfoEnd(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,033✔
278
  ts = taosTimeAdd(ts, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision);
1,033✔
279
  pFillInfo->end = ts;
1,033✔
280
}
1,033✔
281

282
static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,033✔
283
  setFillInfoStart(start, pInterval, pFillInfo);
1,033✔
284
  pFillInfo->current = pFillInfo->start;
1,033✔
285
  setFillInfoEnd(end, pInterval, pFillInfo);
1,033✔
286
}
1,033✔
287

288
void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
131✔
289
  if (!hasPrevWindow(pFillSup) || !hasNextWindow(pFillSup)) {
131✔
290
    pFillInfo->needFill = false;
51✔
291
    return;
51✔
292
  }
293

294
  TSKEY realStart = taosTimeAdd(pFillSup->prev.key, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
80✔
295
                                pFillSup->interval.precision);
80✔
296

297
  pFillInfo->needFill = true;
80✔
298
  pFillInfo->start = realStart;
80✔
299
  pFillInfo->current = pFillInfo->start;
80✔
300
  pFillInfo->end = end;
80✔
301
  pFillInfo->pos = FILL_POS_INVALID;
80✔
302
  switch (pFillInfo->type) {
80!
303
    case TSDB_FILL_NULL:
32✔
304
    case TSDB_FILL_NULL_F:
305
    case TSDB_FILL_SET_VALUE:
306
    case TSDB_FILL_SET_VALUE_F:
307
      break;
32✔
308
    case TSDB_FILL_PREV:
16✔
309
      pFillInfo->pResRow = &pFillSup->prev;
16✔
310
      break;
16✔
311
    case TSDB_FILL_NEXT:
16✔
312
      pFillInfo->pResRow = &pFillSup->next;
16✔
313
      break;
16✔
314
    case TSDB_FILL_LINEAR: {
16✔
315
      setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo);
16✔
316
      pFillInfo->pLinearInfo->hasNext = false;
16✔
317
      pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
16✔
318
      calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
16✔
319
                       pFillSup->numOfAllCols);
320
      pFillInfo->pResRow = &pFillSup->prev;
16✔
321
      pFillInfo->pLinearInfo->winIndex = 0;
16✔
322
    } break;
16✔
323
    default:
×
324
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
325
      break;
×
326
  }
327
}
328

329
void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
492✔
330
  for (int32_t i = pFillSup->numOfFillCols; i < pFillSup->numOfAllCols; ++i) {
1,039✔
331
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
547✔
332
    int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
547✔
333
    SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
547✔
334
    SResultCellData* pCurCell = getResultCell(&pFillSup->cur, slotId);
547✔
335
    pCell->isNull = pCurCell->isNull;
547✔
336
    if (!pCurCell->isNull) {
547!
337
      memcpy(pCell->pData, pCurCell->pData, pCell->bytes);
547✔
338
    }
339
  }
340
}
492✔
341

342
void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillSupporter* pFillSup,
1,212✔
343
                      SStreamFillInfo* pFillInfo) {
344
  pFillInfo->preRowKey = pFillSup->cur.key;
1,212✔
345
  if (!hasPrevWindow(pFillSup) && !hasNextWindow(pFillSup)) {
1,212✔
346
    pFillInfo->needFill = false;
209✔
347
    pFillInfo->pos = FILL_POS_START;
209✔
348
    return;
209✔
349
  }
350
  TSKEY prevWKey = INT64_MIN;
1,003✔
351
  TSKEY nextWKey = INT64_MIN;
1,003✔
352
  if (hasPrevWindow(pFillSup)) {
1,003✔
353
    prevWKey = pFillSup->prev.key;
620✔
354
  }
355
  if (hasNextWindow(pFillSup)) {
1,003✔
356
    nextWKey = pFillSup->next.key;
538✔
357
  }
358

359
  pFillInfo->needFill = true;
1,003✔
360
  pFillInfo->pos = FILL_POS_INVALID;
1,003✔
361
  switch (pFillInfo->type) {
1,003!
362
    case TSDB_FILL_NULL:
417✔
363
    case TSDB_FILL_NULL_F:
364
    case TSDB_FILL_SET_VALUE:
365
    case TSDB_FILL_SET_VALUE_F: {
366
      if (pFillSup->prev.key == pFillInfo->preRowKey) {
417!
367
        resetFillWindow(&pFillSup->prev);
×
368
      }
369
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
417✔
370
        if (pFillSup->next.key == pFillInfo->nextRowKey) {
83✔
371
          pFillInfo->preRowKey = INT64_MIN;
77✔
372
          setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
77✔
373
          pFillInfo->pos = FILL_POS_END;
77✔
374
        } else {
375
          pFillInfo->needFill = false;
6✔
376
          pFillInfo->pos = FILL_POS_START;
6✔
377
        }
378
      } else if (hasPrevWindow(pFillSup)) {
334✔
379
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
192✔
380
        pFillInfo->pos = FILL_POS_END;
192✔
381
      } else {
382
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
142✔
383
        pFillInfo->pos = FILL_POS_START;
142✔
384
      }
385
      copyNotFillExpData(pFillSup, pFillInfo);
417✔
386
    } break;
417✔
387
    case TSDB_FILL_PREV: {
200✔
388
      if (hasNextWindow(pFillSup) && ((pFillSup->next.key != pFillInfo->nextRowKey) ||
200✔
389
                                      (pFillSup->next.key == pFillInfo->nextRowKey && hasNextNextWindow(pFillSup)) ||
81!
390
                                      (pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) {
50!
391
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
85✔
392
        pFillInfo->pos = FILL_POS_START;
85✔
393
        resetFillWindow(&pFillSup->prev);
85✔
394
        pFillSup->prev.key = pFillSup->cur.key;
85✔
395
        pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
85✔
396
      } else if (hasPrevWindow(pFillSup)) {
115!
397
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
115✔
398
        pFillInfo->pos = FILL_POS_END;
115✔
399
        pFillInfo->preRowKey = INT64_MIN;
115✔
400
      }
401
      pFillInfo->pResRow = &pFillSup->prev;
200✔
402
    } break;
200✔
403
    case TSDB_FILL_NEXT: {
203✔
404
      if (hasPrevWindow(pFillSup)) {
203✔
405
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
126✔
406
        pFillInfo->pos = FILL_POS_END;
126✔
407
        resetFillWindow(&pFillSup->next);
126✔
408
        pFillSup->next.key = pFillSup->cur.key;
126✔
409
        pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
126✔
410
        pFillInfo->preRowKey = INT64_MIN;
126✔
411
      } else {
412
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
77✔
413
        pFillInfo->pos = FILL_POS_START;
77✔
414
      }
415
      pFillInfo->pResRow = &pFillSup->next;
203✔
416
    } break;
203✔
417
    case TSDB_FILL_LINEAR: {
183✔
418
      pFillInfo->pLinearInfo->winIndex = 0;
183✔
419
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
183✔
420
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
20✔
421
        pFillInfo->pos = FILL_POS_MID;
20✔
422
        pFillInfo->pLinearInfo->nextEnd = nextWKey;
20✔
423
        calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
20✔
424
                         pFillSup->numOfAllCols);
425
        pFillInfo->pResRow = &pFillSup->prev;
20✔
426

427
        calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
20✔
428
                         pFillSup->numOfAllCols);
429
        pFillInfo->pLinearInfo->hasNext = true;
20✔
430
      } else if (hasPrevWindow(pFillSup)) {
163✔
431
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
79✔
432
        pFillInfo->pos = FILL_POS_END;
79✔
433
        pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
79✔
434
        calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
79✔
435
                         pFillSup->numOfAllCols);
436
        pFillInfo->pResRow = &pFillSup->prev;
79✔
437
        pFillInfo->pLinearInfo->hasNext = false;
79✔
438
      } else {
439
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
84✔
440
        pFillInfo->pos = FILL_POS_START;
84✔
441
        pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
84✔
442
        calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
84✔
443
                         pFillSup->numOfAllCols);
444
        pFillInfo->pResRow = &pFillSup->cur;
84✔
445
        pFillInfo->pLinearInfo->hasNext = false;
84✔
446
      }
447
    } break;
183✔
448
    default:
×
449
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
450
      break;
×
451
  }
452
}
453

454
int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) {
50,174✔
455
  int32_t code = TSDB_CODE_SUCCESS;
50,174✔
456
  int32_t lino = 0;
50,174✔
457
  SWinKey key = {.groupId = groupId, .ts = ts};
50,174✔
458
  if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) {
50,174✔
459
    (*pRes) = false;
93✔
460
    goto _end;
93✔
461
  }
462
  code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
50,566✔
463
  QUERY_CHECK_CODE(code, lino, _end);
50,445!
464
  (*pRes) = true;
50,445✔
465

466
_end:
50,538✔
467
  if (code != TSDB_CODE_SUCCESS) {
50,538!
468
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
469
  }
470
  return code;
50,531✔
471
}
472

473
static int32_t buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock,
36,130✔
474
                               bool* pRes) {
475
  int32_t code = TSDB_CODE_SUCCESS;
36,130✔
476
  int32_t lino = 0;
36,130✔
477
  if (pBlock->info.rows >= pBlock->info.capacity) {
36,130✔
478
    (*pRes) = false;
7✔
479
    goto _end;
7✔
480
  }
481
  uint64_t groupId = pBlock->info.id.groupId;
36,123✔
482
  bool     ckRes = true;
36,123✔
483
  code = checkResult(pFillSup, ts, groupId, &ckRes);
36,123✔
484
  QUERY_CHECK_CODE(code, lino, _end);
36,123!
485

486
  if (pFillSup->hasDelete && !ckRes) {
36,123✔
487
    (*pRes) = true;
28✔
488
    goto _end;
28✔
489
  }
490
  for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
143,926✔
491
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
107,820✔
492
    int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
107,820✔
493
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
107,820✔
494
    SFillInfo        tmpInfo = {
107,816✔
495
               .currentKey = ts,
496
               .order = TSDB_ORDER_ASC,
497
               .interval = pFillSup->interval,
498
    };
499
    bool filled = fillIfWindowPseudoColumn(&tmpInfo, pFillCol, pColData, pBlock->info.rows);
107,816✔
500
    if (!filled) {
107,906✔
501
      SResultCellData* pCell = getResultCell(pResRow, slotId);
71,858✔
502
      code = setRowCell(pColData, pBlock->info.rows, pCell);
71,767✔
503
      QUERY_CHECK_CODE(code, lino, _end);
71,783!
504
    }
505
  }
506
  pBlock->info.rows++;
36,106✔
507
  (*pRes) = true;
36,106✔
508

509
_end:
36,141✔
510
  if (code != TSDB_CODE_SUCCESS) {
36,141!
511
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
512
  }
513
  return code;
36,130✔
514
}
515

516
bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
52,922✔
517
  if (pFillInfo->current != INT64_MIN && pFillInfo->current <= pFillInfo->end) {
52,922✔
518
    return true;
48,434✔
519
  }
520
  return false;
4,488✔
521
}
522

523
static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
853✔
524
  int32_t code = TSDB_CODE_SUCCESS;
853✔
525
  int32_t lino = 0;
853✔
526
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
35,477✔
527
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
34,624✔
528
    if (inWinRange(&pFillSup->winRange, &st)) {
34,624!
529
      bool res = true;
34,624✔
530
      code = buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock, &res);
34,624✔
531
      QUERY_CHECK_CODE(code, lino, _end);
34,624!
532
    }
533
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
34,624✔
534
                                     pFillSup->interval.precision);
34,624✔
535
  }
536

537
_end:
853✔
538
  if (code != TSDB_CODE_SUCCESS) {
853!
539
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
540
  }
541
}
853✔
542

543
static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
214✔
544
  int32_t code = TSDB_CODE_SUCCESS;
214✔
545
  int32_t lino = 0;
214✔
546
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
14,675✔
547
    uint64_t    groupId = pBlock->info.id.groupId;
14,461✔
548
    SWinKey     key = {.groupId = groupId, .ts = pFillInfo->current};
14,461✔
549
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
14,461✔
550
    bool        ckRes = true;
14,461✔
551
    code = checkResult(pFillSup, pFillInfo->current, groupId, &ckRes);
14,461✔
552
    QUERY_CHECK_CODE(code, lino, _end);
14,461!
553

554
    if ((pFillSup->hasDelete && !ckRes) || !inWinRange(&pFillSup->winRange, &st)) {
14,461!
555
      pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
16✔
556
                                       pFillSup->interval.precision);
8✔
557
      pFillInfo->pLinearInfo->winIndex++;
8✔
558
      continue;
8✔
559
    }
560
    pFillInfo->pLinearInfo->winIndex++;
14,453✔
561
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
47,891✔
562
      SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
33,438✔
563
      SFillInfo     tmp = {
33,438✔
564
              .currentKey = pFillInfo->current,
33,438✔
565
              .order = TSDB_ORDER_ASC,
566
              .interval = pFillSup->interval,
567
      };
568

569
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
33,438✔
570
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
33,438✔
571
      int16_t          type = pColData->info.type;
33,438✔
572
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
33,438✔
573
      int32_t          index = pBlock->info.rows;
33,438✔
574
      if (pFillCol->notFillCol) {
33,438✔
575
        bool filled = fillIfWindowPseudoColumn(&tmp, pFillCol, pColData, index);
14,453✔
576
        if (!filled) {
14,453!
577
          code = setRowCell(pColData, index, pCell);
×
578
          QUERY_CHECK_CODE(code, lino, _end);
×
579
        }
580
      } else {
581
        if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
18,985!
582
          colDataSetNULL(pColData, index);
31!
583
          continue;
31✔
584
        }
585
        SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId);
18,954✔
586
        double  vCell = 0;
18,954✔
587
        SPoint  start = {0};
18,954✔
588
        start.key = pFillInfo->pResRow->key;
18,954✔
589
        start.val = pCell->pData;
18,954✔
590

591
        SPoint cur = {0};
18,954✔
592
        cur.key = pFillInfo->current;
18,954✔
593
        cur.val = taosMemoryCalloc(1, pCell->bytes);
18,954✔
594
        QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno);
18,954!
595
        taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
18,954✔
596
        code = colDataSetVal(pColData, index, (const char*)cur.val, false);
18,954✔
597
        QUERY_CHECK_CODE(code, lino, _end);
18,954!
598
        destroySPoint(&cur);
18,954✔
599
      }
600
    }
601
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
28,906✔
602
                                     pFillSup->interval.precision);
14,453✔
603
    pBlock->info.rows++;
14,453✔
604
  }
605

606
_end:
214✔
607
  if (code != TSDB_CODE_SUCCESS) {
214!
608
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
609
  }
610
}
214✔
611

612
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
1,312✔
613
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
1,312✔
614

615
  SWinKey key = {.groupId = groupId, .ts = pRow->key};
1,312✔
616
  int32_t code = pAPI->stateStore.streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
1,312✔
617
  qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 "  code:%d", key.ts, key.groupId, code);
1,312!
618
  if (code != TSDB_CODE_SUCCESS) {
1,312!
619
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
620
  }
621
}
1,312✔
622

623
static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
1,262✔
624
  int32_t code = TSDB_CODE_SUCCESS;
1,262✔
625
  int32_t lino = 0;
1,262✔
626
  bool    res = false;
1,262✔
627
  if (pFillInfo->needFill == false) {
1,262✔
628
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
215✔
629
    QUERY_CHECK_CODE(code, lino, _end);
215!
630
    return;
215✔
631
  }
632

633
  if (pFillInfo->pos == FILL_POS_START) {
1,047✔
634
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
388✔
635
    QUERY_CHECK_CODE(code, lino, _end);
388!
636
    if (res) {
388!
637
      pFillInfo->pos = FILL_POS_INVALID;
388✔
638
    }
639
  }
640
  if (pFillInfo->type != TSDB_FILL_LINEAR) {
1,047✔
641
    doStreamFillNormal(pFillSup, pFillInfo, pRes);
853✔
642
  } else {
643
    doStreamFillLinear(pFillSup, pFillInfo, pRes);
194✔
644

645
    if (pFillInfo->pos == FILL_POS_MID) {
194✔
646
      code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
20✔
647
      QUERY_CHECK_CODE(code, lino, _end);
20!
648
      if (res) {
20!
649
        pFillInfo->pos = FILL_POS_INVALID;
20✔
650
      }
651
    }
652

653
    if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
194✔
654
      pFillInfo->pLinearInfo->hasNext = false;
20✔
655
      pFillInfo->pLinearInfo->winIndex = 0;
20✔
656
      taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
20✔
657
      pFillInfo->pResRow = &pFillSup->cur;
20✔
658
      setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
20✔
659
      doStreamFillLinear(pFillSup, pFillInfo, pRes);
20✔
660
    }
661
  }
662
  if (pFillInfo->pos == FILL_POS_END) {
1,047✔
663
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
596✔
664
    QUERY_CHECK_CODE(code, lino, _end);
596!
665
    if (res) {
596✔
666
      pFillInfo->pos = FILL_POS_INVALID;
589✔
667
    }
668
  }
669

670
_end:
458✔
671
  if (code != TSDB_CODE_SUCCESS) {
1,047!
672
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
673
  }
674
}
675

676
int32_t keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
1,312✔
677
                           int32_t rowId, uint64_t groupId, int32_t rowSize) {
678
  int32_t code = TSDB_CODE_SUCCESS;
1,312✔
679
  int32_t lino = 0;
1,312✔
680
  TSKEY ts = tsCol[rowId];
1,312✔
681
  pFillInfo->nextRowKey = ts;
1,312✔
682
  SResultRowData tmpNextRow = {.key = ts};
1,312✔
683
  tmpNextRow.pRowVal = taosMemoryCalloc(1, rowSize);
1,312✔
684
  QUERY_CHECK_NULL(tmpNextRow.pRowVal, code, lino, _end, terrno);
1,312!
685
  transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
1,312✔
686
  keepResultInDiscBuf(pOperator, groupId, &tmpNextRow, rowSize);
1,312✔
687
  taosMemoryFreeClear(tmpNextRow.pRowVal);
1,312!
688

689
_end:
×
690
  if (code != TSDB_CODE_SUCCESS) {
1,312!
691
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
692
  }
693
  return code;
1,312✔
694
}
695

696
static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
1,212✔
697
                          SSDataBlock* pBlock, TSKEY* tsCol, int32_t rowId, SSDataBlock* pRes) {
698
  uint64_t groupId = pBlock->info.id.groupId;
1,212✔
699
  getWindowFromDiscBuf(pOperator, tsCol[rowId], groupId, pFillSup);
1,212✔
700
  if (pFillSup->prev.key == pFillInfo->preRowKey) {
1,212✔
701
    resetFillWindow(&pFillSup->prev);
592✔
702
  }
703
  setFillValueInfo(pBlock, tsCol[rowId], rowId, pFillSup, pFillInfo);
1,212✔
704
  doStreamFillRange(pFillInfo, pFillSup, pRes);
1,212✔
705
}
1,212✔
706

707
static void doStreamFillImpl(SOperatorInfo* pOperator) {
754✔
708
  int32_t                  code = TSDB_CODE_SUCCESS;
754✔
709
  int32_t                  lino = 0;
754✔
710
  SStreamFillOperatorInfo* pInfo = pOperator->info;
754✔
711
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
754✔
712
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
754✔
713
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
754✔
714
  SSDataBlock*             pBlock = pInfo->pSrcBlock;
754✔
715
  uint64_t                 groupId = pBlock->info.id.groupId;
754✔
716
  SSDataBlock*             pRes = pInfo->pRes;
754✔
717
  SColumnInfoData*         pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
754✔
718
  TSKEY*                   tsCol = (TSKEY*)pTsCol->pData;
754✔
719
  pRes->info.id.groupId = groupId;
754✔
720
  pInfo->srcRowIndex++;
754✔
721

722
  if (pInfo->srcRowIndex == 0) {
754!
723
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
754✔
724
    QUERY_CHECK_CODE(code, lino, _end);
754!
725
    pInfo->srcRowIndex++;
754✔
726
  }
727

728
  while (pInfo->srcRowIndex < pBlock->info.rows) {
1,212✔
729
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
458✔
730
    QUERY_CHECK_CODE(code, lino, _end);
458!
731
    doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
458✔
732
    if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) {
458!
733
      code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
×
734
      QUERY_CHECK_CODE(code, lino, _end);
×
735
      return;
×
736
    }
737
    pInfo->srcRowIndex++;
458✔
738
  }
739
  doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
754✔
740
  code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
754✔
741
  QUERY_CHECK_CODE(code, lino, _end);
754!
742
  blockDataCleanup(pInfo->pSrcBlock);
754✔
743

744
_end:
754✔
745
  if (code != TSDB_CODE_SUCCESS) {
754!
746
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
747
  }
748
}
749

750
static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) {
51✔
751
  int32_t          code = TSDB_CODE_SUCCESS;
51✔
752
  int32_t          lino = 0;
51✔
753
  SStorageAPI*     pAPI = &pOp->pTaskInfo->storageAPI;
51✔
754
  void*            pState = pOp->pTaskInfo->streamInfo.pState;
51✔
755
  SExecTaskInfo*   pTaskInfo = pOp->pTaskInfo;
51✔
756
  SSDataBlock*     pBlock = delRes;
51✔
757
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
51✔
758
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
51✔
759
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
51✔
760
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
51✔
761
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
51✔
762
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
51✔
763
  SColumnInfoData* pTbNameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
51✔
764
  code = colDataSetVal(pStartCol, pBlock->info.rows, (const char*)&start, false);
51✔
765
  QUERY_CHECK_CODE(code, lino, _end);
51!
766

767
  code = colDataSetVal(pEndCol, pBlock->info.rows, (const char*)&end, false);
51✔
768
  QUERY_CHECK_CODE(code, lino, _end);
51!
769

770
  colDataSetNULL(pUidCol, pBlock->info.rows);
51!
771
  code = colDataSetVal(pGroupCol, pBlock->info.rows, (const char*)&groupId, false);
51✔
772
  QUERY_CHECK_CODE(code, lino, _end);
51!
773

774
  colDataSetNULL(pCalStartCol, pBlock->info.rows);
51!
775
  colDataSetNULL(pCalEndCol, pBlock->info.rows);
51!
776

777
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
51✔
778

779
  void*   tbname = NULL;
51✔
780
  int32_t winCode = TSDB_CODE_SUCCESS;
51✔
781
  code = pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname, false, &winCode);
51✔
782
  QUERY_CHECK_CODE(code, lino, _end);
51!
783
  if (winCode != TSDB_CODE_SUCCESS) {
51✔
784
    colDataSetNULL(pTableCol, pBlock->info.rows);
15!
785
  } else {
786
    char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
787
    STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
36✔
788
    code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
36✔
789
    QUERY_CHECK_CODE(code, lino, _end);
36!
790
    pAPI->stateStore.streamStateFreeVal(tbname);
36✔
791
  }
792

793
  pBlock->info.rows++;
51✔
794

795
_end:
51✔
796
  if (code != TSDB_CODE_SUCCESS) {
51!
797
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
798
  }
799
  return code;
51✔
800
}
801

802
static int32_t buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId,
51✔
803
                                 SSDataBlock* delRes) {
804
  int32_t                  code = TSDB_CODE_SUCCESS;
51✔
805
  int32_t                  lino = 0;
51✔
806
  SStreamFillOperatorInfo* pInfo = pOperator->info;
51✔
807
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
51✔
808
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
51✔
809
  if (hasPrevWindow(pFillSup)) {
51✔
810
    TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval);
31✔
811
    code = buildDeleteRange(pOperator, start, endTs, groupId, delRes);
31✔
812
    QUERY_CHECK_CODE(code, lino, _end);
31!
813
  } else if (hasNextWindow(pFillSup)) {
20✔
814
    TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval);
15✔
815
    code = buildDeleteRange(pOperator, startTs, end, groupId, delRes);
15✔
816
    QUERY_CHECK_CODE(code, lino, _end);
15!
817
  } else {
818
    code = buildDeleteRange(pOperator, startTs, endTs, groupId, delRes);
5✔
819
    QUERY_CHECK_CODE(code, lino, _end);
5!
820
  }
821

822
_end:
5✔
823
  if (code != TSDB_CODE_SUCCESS) {
51!
824
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
825
  }
826
  return code;
51✔
827
}
828

829
static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId) {
91✔
830
  int32_t                  code = TSDB_CODE_SUCCESS;
91✔
831
  int32_t                  lino = 0;
91✔
832
  SStorageAPI*             pAPI = &pOperator->pTaskInfo->storageAPI;
91✔
833
  SStreamFillOperatorInfo* pInfo = pOperator->info;
91✔
834
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
91✔
835
  getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup);
91✔
836
  setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo);
91✔
837
  SWinKey key = {.ts = startTs, .groupId = groupId};
91✔
838
  pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
91✔
839
  if (!pInfo->pFillInfo->needFill) {
91✔
840
    code = buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
51✔
841
    QUERY_CHECK_CODE(code, lino, _end);
51!
842
  } else {
843
    STimeRange tw = {
40✔
844
        .skey = startTs,
845
        .ekey = endTs,
846
        .groupId = groupId,
847
    };
848
    void* tmp = taosArrayPush(pInfo->pFillInfo->delRanges, &tw);
40✔
849
    if (!tmp) {
40!
850
      code = terrno;
×
851
      QUERY_CHECK_CODE(code, lino, _end);
×
852
    }
853
  }
854

855
_end:
91✔
856
  if (code != TSDB_CODE_SUCCESS) {
91!
857
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
858
  }
859
  return code;
91✔
860
}
861

862
static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_t groupId, SResultRowData* pWinData) {
×
863
  SWinKey key = {.ts = ts, .groupId = groupId};
×
864
  void*   val = NULL;
×
865
  int32_t len = 0;
×
866
  int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len, NULL);
×
867
  if (code != TSDB_CODE_SUCCESS) {
×
868
    qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts,
×
869
           groupId);
870
    SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
×
871
    code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &key, (const void**)&val, &len);
×
872
    pAPI->stateStore.streamStateFreeCur(pCur);
×
873
    qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code);
×
874
  }
875

876
  if (code == TSDB_CODE_SUCCESS) {
×
877
    resetFillWindow(pWinData);
×
878
    pWinData->key = key.ts;
×
879
    pWinData->pRowVal = val;
×
880
  }
881
}
×
882

883
static void doDeleteFillFinalize(SOperatorInfo* pOperator) {
2,276✔
884
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
2,276✔
885

886
  SStreamFillOperatorInfo* pInfo = pOperator->info;
2,276✔
887
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
2,276✔
888
  int32_t                  size = taosArrayGetSize(pFillInfo->delRanges);
2,276✔
889
  while (pFillInfo->delIndex < size) {
2,316✔
890
    STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex);
40✔
891
    if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) {
40!
892
      return;
×
893
    }
894
    getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup);
40✔
895
    TSKEY realEnd = range->ekey + 1;
40✔
896
    if (pInfo->pFillInfo->type == TSDB_FILL_NEXT && pInfo->pFillSup->next.key != realEnd) {
40!
897
      getWindowInfoByKey(pAPI, pOperator->pTaskInfo->streamInfo.pState, realEnd, range->groupId,
×
898
                         &pInfo->pFillSup->next);
×
899
    }
900
    setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo);
40✔
901
    pFillInfo->delIndex++;
40✔
902
    if (pInfo->pFillInfo->needFill) {
40!
903
      doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
40✔
904
      pInfo->pRes->info.id.groupId = range->groupId;
40✔
905
    }
906
  }
907
}
908

909
static int32_t doDeleteFillResult(SOperatorInfo* pOperator) {
81✔
910
  int32_t                  code = TSDB_CODE_SUCCESS;
81✔
911
  int32_t                  lino = 0;
81✔
912
  SStorageAPI*             pAPI = &pOperator->pTaskInfo->storageAPI;
81✔
913
  SStreamFillOperatorInfo* pInfo = pOperator->info;
81✔
914
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
81✔
915
  SSDataBlock*             pBlock = pInfo->pSrcDelBlock;
81✔
916
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
81✔
917

918
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
81✔
919
  TSKEY*           tsStarts = (TSKEY*)pStartCol->pData;
81✔
920
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
81✔
921
  uint64_t*        groupIds = (uint64_t*)pGroupCol->pData;
81✔
922
  while (pInfo->srcDelRowIndex < pBlock->info.rows) {
208✔
923
    TSKEY            ts = tsStarts[pInfo->srcDelRowIndex];
127✔
924
    TSKEY            endTs = ts;
127✔
925
    uint64_t         groupId = groupIds[pInfo->srcDelRowIndex];
127✔
926
    SWinKey          key = {.ts = ts, .groupId = groupId};
127✔
927
    SStreamStateCur* pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &key);
127✔
928

929
    if (!pCur) {
127✔
930
      pInfo->srcDelRowIndex++;
36✔
931
      continue;
36✔
932
    }
933

934
    SWinKey nextKey = {.groupId = groupId, .ts = ts};
91✔
935
    while (pInfo->srcDelRowIndex < pBlock->info.rows) {
271✔
936
      TSKEY    delTs = tsStarts[pInfo->srcDelRowIndex];
226✔
937
      uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex];
226✔
938
      int32_t  winCode = TSDB_CODE_SUCCESS;
226✔
939
      if (groupId != delGroupId) {
226!
940
        break;
46✔
941
      }
942
      if (delTs > nextKey.ts) {
226✔
943
        break;
10✔
944
      }
945

946
      SWinKey delKey = {.groupId = delGroupId, .ts = delTs};
216✔
947
      if (delTs == nextKey.ts) {
216✔
948
        pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur);
156✔
949
        winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, NULL, NULL);
156✔
950
        // ts will be deleted later
951
        if (delTs != ts) {
156✔
952
          pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &delKey);
65✔
953
          pAPI->stateStore.streamStateFreeCur(pCur);
65✔
954
          pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &nextKey);
65✔
955
        }
956
        endTs = TMAX(delTs, nextKey.ts - 1);
156✔
957
        if (winCode != TSDB_CODE_SUCCESS) {
156✔
958
          break;
36✔
959
        }
960
      }
961
      pInfo->srcDelRowIndex++;
180✔
962
    }
963

964
    pAPI->stateStore.streamStateFreeCur(pCur);
91✔
965
    code = doDeleteFillResultImpl(pOperator, ts, endTs, groupId);
91✔
966
    QUERY_CHECK_CODE(code, lino, _end);
91!
967
  }
968

969
  pFillInfo->current = pFillInfo->end + 1;
81✔
970

971
_end:
81✔
972
  if (code != TSDB_CODE_SUCCESS) {
81!
973
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
974
  }
975
  return code;
81✔
976
}
977

978
void resetStreamFillSup(SStreamFillSupporter* pFillSup) {
2,408✔
979
  tSimpleHashClear(pFillSup->pResMap);
2,408✔
980
  pFillSup->hasDelete = false;
2,408✔
981
}
2,408✔
982
void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) {
2,245✔
983
  resetStreamFillSup(pInfo->pFillSup);
2,245✔
984
  taosArrayClear(pInfo->pFillInfo->delRanges);
2,245✔
985
  pInfo->pFillInfo->delIndex = 0;
2,245✔
986
}
2,245✔
987

988
static int32_t doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pSrcBlock,
812✔
989
                                              SSDataBlock* pDstBlock) {
990
  int32_t                  code = TSDB_CODE_SUCCESS;
812✔
991
  int32_t                  lino = 0;
812✔
992
  SStreamFillOperatorInfo* pInfo = pOperator->info;
812✔
993
  SExprSupp*               pSup = &pOperator->exprSupp;
812✔
994
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
812✔
995

996
  blockDataCleanup(pDstBlock);
812✔
997
  code = blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
812✔
998
  QUERY_CHECK_CODE(code, lino, _end);
812!
999

1000
  code = setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
812✔
1001
  QUERY_CHECK_CODE(code, lino, _end);
812!
1002
  code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
812✔
1003
  QUERY_CHECK_CODE(code, lino, _end);
812!
1004

1005
  pDstBlock->info.rows = 0;
812✔
1006
  pSup = &pInfo->pFillSup->notFillExprSup;
812✔
1007
  code = setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
812✔
1008
  QUERY_CHECK_CODE(code, lino, _end);
812!
1009
  code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
812✔
1010
  QUERY_CHECK_CODE(code, lino, _end);
812!
1011

1012
  pDstBlock->info.id.groupId = pSrcBlock->info.id.groupId;
812✔
1013

1014
  code = blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol);
812✔
1015

1016
_end:
812✔
1017
  if (code != TSDB_CODE_SUCCESS) {
812!
1018
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1019
  }
1020
  return code;
812✔
1021
}
1022

1023
static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,165✔
1024
  int32_t                  code = TSDB_CODE_SUCCESS;
3,165✔
1025
  int32_t                  lino = 0;
3,165✔
1026
  SStreamFillOperatorInfo* pInfo = pOperator->info;
3,165✔
1027
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
3,165✔
1028

1029
  if (pOperator->status == OP_EXEC_DONE) {
3,165!
1030
    (*ppRes) = NULL;
×
1031
    return code;
×
1032
  }
1033
  blockDataCleanup(pInfo->pRes);
3,165✔
1034
  if (hasRemainCalc(pInfo->pFillInfo) ||
3,165✔
1035
      (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
3,158✔
1036
    doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
10✔
1037
    if (pInfo->pRes->info.rows > 0) {
10!
1038
      printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
10✔
1039
      (*ppRes) = pInfo->pRes;
10✔
1040
      return code;
10✔
1041
    }
1042
  }
1043
  if (pOperator->status == OP_RES_TO_RETURN) {
3,155✔
1044
    doDeleteFillFinalize(pOperator);
31✔
1045
    if (pInfo->pRes->info.rows > 0) {
31!
1046
      printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1047
      (*ppRes) = pInfo->pRes;
×
1048
      return code;
×
1049
    }
1050
    setOperatorCompleted(pOperator);
31✔
1051
    resetStreamFillInfo(pInfo);
31✔
1052
    (*ppRes) = NULL;
31✔
1053
    return code;
31✔
1054
  }
1055

1056
  SSDataBlock*   fillResult = NULL;
3,124✔
1057
  SOperatorInfo* downstream = pOperator->pDownstream[0];
3,124✔
1058
  while (1) {
1059
    if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) {
3,154!
1060
      // If there are delete datablocks, we receive  them first.
1061
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,154✔
1062
      if (pBlock == NULL) {
3,154✔
1063
        pOperator->status = OP_RES_TO_RETURN;
2,245✔
1064
        pInfo->pFillInfo->preRowKey = INT64_MIN;
2,245✔
1065
        if (pInfo->pRes->info.rows > 0) {
2,245!
1066
          printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1067
          (*ppRes) = pInfo->pRes;
×
1068
          return code;
×
1069
        }
1070
        break;
2,245✔
1071
      }
1072
      printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
909✔
1073

1074
      if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
909✔
1075
        pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
160✔
1076
        pInfo->pFillInfo->preRowKey = INT64_MIN;
160✔
1077
      }
1078

1079
      pInfo->pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
909✔
1080
      if (pInfo->pFillSup->winRange.ekey <= 0) {
909!
1081
        pInfo->pFillSup->winRange.ekey = INT64_MAX;
×
1082
      }
1083

1084
      switch (pBlock->info.type) {
909!
1085
        case STREAM_RETRIEVE:
5✔
1086
          (*ppRes) = pBlock;
5✔
1087
          return code;
5✔
1088
        case STREAM_DELETE_RESULT: {
81✔
1089
          pInfo->pSrcDelBlock = pBlock;
81✔
1090
          pInfo->srcDelRowIndex = 0;
81✔
1091
          blockDataCleanup(pInfo->pDelRes);
81✔
1092
          pInfo->pFillSup->hasDelete = true;
81✔
1093
          code = doDeleteFillResult(pOperator);
81✔
1094
          QUERY_CHECK_CODE(code, lino, _end);
81!
1095

1096
          if (pInfo->pDelRes->info.rows > 0) {
81✔
1097
            printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
51✔
1098
            (*ppRes) = pInfo->pDelRes;
51✔
1099
            return code;
51✔
1100
          }
1101
          continue;
30✔
1102
        } break;
1103
        case STREAM_NORMAL:
754✔
1104
        case STREAM_INVALID:
1105
        case STREAM_PULL_DATA: {
1106
          code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
754✔
1107
          QUERY_CHECK_CODE(code, lino, _end);
754!
1108

1109
          memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
754✔
1110
          pInfo->srcRowIndex = -1;
754✔
1111
        } break;
754✔
1112
        case STREAM_CHECKPOINT:
69✔
1113
        case STREAM_CREATE_CHILD_TABLE: {
1114
          (*ppRes) = pBlock;
69✔
1115
          return code;
69✔
1116
        } break;
1117
        default:
×
1118
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1119
      }
1120
    }
1121

1122
    doStreamFillImpl(pOperator);
754✔
1123
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
754✔
1124
    QUERY_CHECK_CODE(code, lino, _end);
754!
1125

1126
    memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
754✔
1127
    pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
754✔
1128
    if (pInfo->pRes->info.rows > 0) {
754!
1129
      break;
754✔
1130
    }
1131
  }
1132
  if (pOperator->status == OP_RES_TO_RETURN) {
2,999✔
1133
    doDeleteFillFinalize(pOperator);
2,245✔
1134
  }
1135

1136
  if (pInfo->pRes->info.rows == 0) {
2,999✔
1137
    setOperatorCompleted(pOperator);
2,214✔
1138
    resetStreamFillInfo(pInfo);
2,214✔
1139
    (*ppRes) = NULL;
2,214✔
1140
    return code;
2,214✔
1141
  }
1142

1143
  pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
785✔
1144
  printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
785✔
1145
  (*ppRes) = pInfo->pRes;
785✔
1146
  return code;
785✔
1147

1148
_end:
×
1149
  if (code != TSDB_CODE_SUCCESS) {
×
1150
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1151
    pTaskInfo->code = code;
×
1152
    T_LONG_JMP(pTaskInfo->env, code);
×
1153
  }
1154
  setOperatorCompleted(pOperator);
×
1155
  resetStreamFillInfo(pInfo);
×
1156
  (*ppRes) = NULL;
×
1157
  return code;
×
1158
}
1159

1160
void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup,
230✔
1161
                                SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
1162
  int32_t code = TSDB_CODE_SUCCESS;
230✔
1163
  int32_t lino = 0;
230✔
1164
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
230✔
1165
  void*        pState = pOperator->pTaskInfo->streamInfo.pState;
230✔
1166
  bool    res = false;
230✔
1167
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
230✔
1168
  for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
517✔
1169
    SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
361✔
1170
    if (pBlock->info.id.groupId == 0) {
361✔
1171
      pBlock->info.id.groupId = pKey->groupId;
230✔
1172
    } else if (pBlock->info.id.groupId != pKey->groupId) {
131✔
1173
      break;
74✔
1174
    }
1175
    void*    val = NULL;
287✔
1176
    int32_t  len = 0;
287✔
1177
    int32_t  winCode = pAPI->stateStore.streamStateFillGet(pOperator->pTaskInfo->streamInfo.pState, pKey, (void**)&val, &len, NULL);
287✔
1178
    qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode);
287!
1179
    if (winCode == TSDB_CODE_SUCCESS) {
287✔
1180
      pFillSup->cur.key = pKey->ts;
100✔
1181
      pFillSup->cur.pRowVal = val;
100✔
1182
      code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
100✔
1183
      QUERY_CHECK_CODE(code, lino, _end);
100!
1184
      resetFillWindow(&pFillSup->cur);
100✔
1185
    } else {
1186
      SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, pKey);
187✔
1187
      SWinKey          preKey = {.ts = INT64_MIN, .groupId = pKey->groupId};
187✔
1188
      void*            preVal = NULL;
187✔
1189
      int32_t          preVLen = 0;
187✔
1190
      winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
187✔
1191
      if (winCode == TSDB_CODE_SUCCESS) {
187!
1192
        pFillSup->cur.key = pKey->ts;
187✔
1193
        pFillSup->cur.pRowVal = preVal;
187✔
1194
        if (pFillInfo->type == TSDB_FILL_PREV) {
187✔
1195
          code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
112✔
1196
          QUERY_CHECK_CODE(code, lino, _end);
112!
1197
        } else {
1198
          copyNotFillExpData(pFillSup, pFillInfo);
75✔
1199
          pFillInfo->pResRow->key = pKey->ts;
75✔
1200
          code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res);
75✔
1201
          QUERY_CHECK_CODE(code, lino, _end);
75!
1202
        }
1203
        resetFillWindow(&pFillSup->cur);
187✔
1204
      }
1205
      pAPI->stateStore.streamStateFreeCur(pCur);
187✔
1206
    }
1207
  }
1208

1209
_end:
156✔
1210
  if (code != TSDB_CODE_SUCCESS) {
230!
1211
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1212
  }
1213
}
230✔
1214

1215
void doBuildForceFillResult(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
641✔
1216
                            SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
1217
  blockDataCleanup(pBlock);
641✔
1218
  if (!hasRemainResults(pGroupResInfo)) {
641✔
1219
    return;
411✔
1220
  }
1221

1222
  // clear the existed group id
1223
  pBlock->info.id.groupId = 0;
230✔
1224
  doBuildForceFillResultImpl(pOperator, pFillSup, pFillInfo, pBlock, pGroupResInfo);
230✔
1225
}
1226

1227
static int32_t buildForceFillResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
641✔
1228
  int32_t                  code = TSDB_CODE_SUCCESS;
641✔
1229
  int32_t                  lino = 0;
641✔
1230
  SStreamFillOperatorInfo* pInfo = pOperator->info;
641✔
1231
  uint16_t                 opType = pOperator->operatorType;
641✔
1232
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
641✔
1233

1234
  doBuildForceFillResult(pOperator, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
641✔
1235
  if (pInfo->pRes->info.rows != 0) {
641✔
1236
    printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
230✔
1237
    (*ppRes) = pInfo->pRes;
230✔
1238
    goto _end;
230✔
1239
  }
1240

1241
  (*ppRes) = NULL;
411✔
1242

1243
_end:
641✔
1244
  if (code != TSDB_CODE_SUCCESS) {
641!
1245
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1246
  }
1247
  return code;
641✔
1248
}
1249

1250
// force window close impl
1251
static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
58✔
1252
  int32_t                  code = TSDB_CODE_SUCCESS;
58✔
1253
  int32_t                  lino = 0;
58✔
1254
  SStreamFillOperatorInfo* pInfo = pOperator->info;
58✔
1255
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
58✔
1256
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
58✔
1257
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
58✔
1258
  SSDataBlock*             pBlock = pInfo->pSrcBlock;
58✔
1259
  uint64_t                 groupId = pBlock->info.id.groupId;
58✔
1260
  SStreamAggSupporter*     pAggSup = pInfo->pStreamAggSup;
58✔
1261
  SColumnInfoData*         pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
58✔
1262
  TSKEY*                   tsCol = (TSKEY*)pTsCol->pData;
58✔
1263
  for (int32_t i = 0; i < pBlock->info.rows; i++){
158✔
1264
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
100✔
1265
    QUERY_CHECK_CODE(code, lino, _end);
100!
1266

1267
    int32_t size =  taosArrayGetSize(pInfo->pCloseTs);
100✔
1268
    if (size > 0) {
100!
1269
      TSKEY* pTs = (TSKEY*) taosArrayGet(pInfo->pCloseTs, 0);
100✔
1270
      TSKEY  resTs = tsCol[i];
100✔
1271
      while (resTs < (*pTs)) {
129✔
1272
        SWinKey key = {.groupId = groupId, .ts = resTs};
57✔
1273
        void* pPushRes = taosArrayPush(pInfo->pUpdated, &key);
57✔
1274
        QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
57!
1275

1276
        if (IS_FILL_CONST_VALUE(pFillSup->type)) {
57!
1277
          break;
1278
        }
1279
        resTs = taosTimeAdd(resTs, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
29✔
1280
                            pFillSup->interval.precision);
29✔
1281
      }
1282
    }
1283
  }
1284
  code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0);
58✔
1285
  QUERY_CHECK_CODE(code, lino, _end);
58!
1286

1287
_end:
58✔
1288
  if (code != TSDB_CODE_SUCCESS) {
58!
1289
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1290
  }
1291
  return code;
58✔
1292
}
1293

1294
int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) {
562✔
1295
  int32_t          code = TSDB_CODE_SUCCESS;
562✔
1296
  int32_t          lino = 0;
562✔
1297
  int64_t          groupId = 0;
562✔
1298
  SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState);
562✔
1299
  while (1) {  
372✔
1300
    int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
934✔
1301
    if (winCode != TSDB_CODE_SUCCESS) {
934✔
1302
      break;
562✔
1303
    }
1304
    SWinKey key = {.ts = ts, .groupId = groupId};
372✔
1305
    void* pPushRes = taosArrayPush(pUpdated, &key);
372✔
1306
    QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
372!
1307

1308
    pAggSup->stateStore.streamStateGroupCurNext(pCur);
372✔
1309
  }
1310
  pAggSup->stateStore.streamStateFreeCur(pCur);
562✔
1311
  pCur = NULL;
562✔
1312

1313
_end:
562✔
1314
  if (code != TSDB_CODE_SUCCESS) {
562!
1315
    pAggSup->stateStore.streamStateFreeCur(pCur);
×
1316
    pCur = NULL;
×
1317
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1318
  }
1319
  return code;
562✔
1320
}
1321

1322
static void removeDuplicateResult(SArray* pTsArrray, __compar_fn_t fn) {
411✔
1323
  taosArraySort(pTsArrray, fn);
411✔
1324
  taosArrayRemoveDuplicate(pTsArrray, fn, NULL);
411✔
1325
}
411✔
1326

1327
// force window close
1328
static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
641✔
1329
  int32_t                  code = TSDB_CODE_SUCCESS;
641✔
1330
  int32_t                  lino = 0;
641✔
1331
  SStreamFillOperatorInfo* pInfo = pOperator->info;
641✔
1332
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
641✔
1333

1334
  if (pOperator->status == OP_EXEC_DONE) {
641!
1335
    (*ppRes) = NULL;
×
1336
    return code;
×
1337
  }
1338

1339
  if (pOperator->status == OP_RES_TO_RETURN) {
641✔
1340
    SSDataBlock* resBlock = NULL;
230✔
1341
    code = buildForceFillResult(pOperator, &resBlock);
230✔
1342
    QUERY_CHECK_CODE(code, lino, _end);
230!
1343

1344
    if (resBlock != NULL) {
230✔
1345
      (*ppRes) = resBlock;
74✔
1346
      goto _end;
74✔
1347
    }
1348
    pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
156✔
1349
    setStreamOperatorCompleted(pOperator);
156✔
1350
    (*ppRes) = NULL;
156✔
1351
    goto _end;
156✔
1352
  }
1353

1354
  SSDataBlock*   fillResult = NULL;
411✔
1355
  SOperatorInfo* downstream = pOperator->pDownstream[0];
411✔
1356
  while (1) {
457✔
1357
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
868✔
1358
    if (pBlock == NULL) {
868✔
1359
      pOperator->status = OP_RES_TO_RETURN;
411✔
1360
      qDebug("===stream===return data:%s.", getStreamOpName(pOperator->operatorType));
411!
1361
      break;
411✔
1362
    }
1363
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
457✔
1364
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
457✔
1365

1366
    switch (pBlock->info.type) {
457!
1367
      case STREAM_NORMAL:
58✔
1368
      case STREAM_INVALID: {
1369
        code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
58✔
1370
        QUERY_CHECK_CODE(code, lino, _end);
58!
1371

1372
        memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
58✔
1373
        pInfo->srcRowIndex = -1;
58✔
1374
      } break;
58✔
1375
      case STREAM_CHECKPOINT:
×
1376
      case STREAM_CREATE_CHILD_TABLE: {
1377
        (*ppRes) = pBlock;
×
1378
        goto _end;
×
1379
      } break;
1380
      case STREAM_GET_RESULT: {
798✔
1381
        void* pPushRes = taosArrayPush(pInfo->pCloseTs, &pBlock->info.window.skey);
399✔
1382
        QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
399!
1383
        continue;
399✔
1384
      }
1385
      default:
×
1386
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1387
        QUERY_CHECK_CODE(code, lino, _end);
×
1388
    }
1389

1390
    code = doStreamForceFillImpl(pOperator);
58✔
1391
    QUERY_CHECK_CODE(code, lino, _end);
58!
1392
  }
1393

1394
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) {
810✔
1395
    TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
399✔
1396
    code = buildAllResultKey(pInfo->pStreamAggSup, ts, pInfo->pUpdated);
399✔
1397
    QUERY_CHECK_CODE(code, lino, _end);
399!
1398
  }
1399
  taosArrayClear(pInfo->pCloseTs);
411✔
1400
  removeDuplicateResult(pInfo->pUpdated, winKeyCmprImpl);
411✔
1401

1402
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
411✔
1403
  pInfo->groupResInfo.freeItem = false;
411✔
1404

1405
  pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey));
411✔
1406
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
411!
1407

1408
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
411✔
1409
  QUERY_CHECK_CODE(code, lino, _end);
411!
1410

1411
  code = buildForceFillResult(pOperator, ppRes);
411✔
1412
  QUERY_CHECK_CODE(code, lino, _end);
411!
1413

1414
  if ((*ppRes) == NULL) {
411✔
1415
    pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
255✔
1416
    setStreamOperatorCompleted(pOperator);
255✔
1417
  }
1418

1419
_end:
156✔
1420
  if (code != TSDB_CODE_SUCCESS) {
641!
1421
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1422
    pTaskInfo->code = code;
×
1423
  }
1424
  return code;
641✔
1425
}
1426

1427
static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) {
227✔
1428
  int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock);
227✔
1429
  pFillSup->rowSize = sizeof(SResultCellData) * numOfCols;
227✔
1430
  for (int i = 0; i < numOfCols; i++) {
3,578✔
1431
    SColumnInfoData* pCol = taosArrayGet(pInputRes->pDataBlock, i);
3,351✔
1432
    pFillSup->rowSize += pCol->info.bytes;
3,351✔
1433
  }
1434
  pFillSup->next.key = INT64_MIN;
227✔
1435
  pFillSup->nextNext.key = INT64_MIN;
227✔
1436
  pFillSup->prev.key = INT64_MIN;
227✔
1437
  pFillSup->cur.key = INT64_MIN;
227✔
1438
  pFillSup->next.pRowVal = NULL;
227✔
1439
  pFillSup->nextNext.pRowVal = NULL;
227✔
1440
  pFillSup->prev.pRowVal = NULL;
227✔
1441
  pFillSup->cur.pRowVal = NULL;
227✔
1442

1443
  return TSDB_CODE_SUCCESS;
227✔
1444
}
1445

1446
static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval,
227✔
1447
                                               SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI, SSDataBlock* pInputRes) {
1448
  int32_t               code = TSDB_CODE_SUCCESS;
227✔
1449
  int32_t               lino = 0;
227✔
1450
  SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
227✔
1451
  if (!pFillSup) {
227!
1452
    code = terrno;
×
1453
    QUERY_CHECK_CODE(code, lino, _end);
×
1454
  }
1455
  pFillSup->numOfFillCols = numOfFillCols;
227✔
1456
  int32_t    numOfNotFillCols = 0;
227✔
1457
  SExprInfo* noFillExprInfo = NULL;
227✔
1458

1459
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExprInfo, &numOfNotFillCols);
227✔
1460
  QUERY_CHECK_CODE(code, lino, _end);
227!
1461

1462
  pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
454✔
1463
                                            NULL, 0, (const SNodeListNode*)(pPhyFillNode->pValues));
227✔
1464
  if (pFillSup->pAllColInfo == NULL) {
227!
1465
    code = terrno;
×
1466
    lino = __LINE__;
×
1467
    destroyExprInfo(noFillExprInfo, numOfNotFillCols);
×
1468
    goto _end;
×
1469
  }
1470

1471
  pFillSup->type = convertFillType(pPhyFillNode->mode);
227✔
1472
  pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
227✔
1473
  pFillSup->interval = *pInterval;
227✔
1474
  pFillSup->pAPI = pAPI;
227✔
1475

1476
  code = initResultBuf(pInputRes, pFillSup);
227✔
1477
  QUERY_CHECK_CODE(code, lino, _end);
227!
1478

1479
  SExprInfo* noFillExpr = NULL;
227✔
1480
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExpr, &numOfNotFillCols);
227✔
1481
  QUERY_CHECK_CODE(code, lino, _end);
227!
1482

1483
  code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore);
227✔
1484
  QUERY_CHECK_CODE(code, lino, _end);
227!
1485

1486
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
227✔
1487
  pFillSup->pResMap = tSimpleHashInit(16, hashFn);
227✔
1488
  QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno);
227!
1489
  pFillSup->hasDelete = false;
227✔
1490

1491
_end:
227✔
1492
  if (code != TSDB_CODE_SUCCESS) {
227!
1493
    destroyStreamFillSupporter(pFillSup);
×
1494
    pFillSup = NULL;
×
1495
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1496
  }
1497
  return pFillSup;
227✔
1498
}
1499

1500
SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
238✔
1501
  int32_t          code = TSDB_CODE_SUCCESS;
238✔
1502
  int32_t          lino = 0;
238✔
1503
  SStreamFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SStreamFillInfo));
238✔
1504
  if (!pFillInfo) {
238!
1505
    code = terrno;
×
1506
    QUERY_CHECK_CODE(code, lino, _end);
×
1507
  }
1508

1509
  pFillInfo->start = INT64_MIN;
238✔
1510
  pFillInfo->current = INT64_MIN;
238✔
1511
  pFillInfo->end = INT64_MIN;
238✔
1512
  pFillInfo->preRowKey = INT64_MIN;
238✔
1513
  pFillInfo->needFill = false;
238✔
1514
  pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
238✔
1515
  if (!pFillInfo) {
238!
1516
    code = terrno;
×
1517
    QUERY_CHECK_CODE(code, lino, _end);
×
1518
  }
1519

1520
  pFillInfo->pLinearInfo->hasNext = false;
238✔
1521
  pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
238✔
1522
  pFillInfo->pLinearInfo->pEndPoints = NULL;
238✔
1523
  pFillInfo->pLinearInfo->pNextEndPoints = NULL;
238✔
1524
  if (pFillSup->type == TSDB_FILL_LINEAR) {
238✔
1525
    pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
44✔
1526
    if (!pFillInfo->pLinearInfo->pEndPoints) {
44!
1527
      code = terrno;
×
1528
      QUERY_CHECK_CODE(code, lino, _end);
×
1529
    }
1530

1531
    pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
44✔
1532
    if (!pFillInfo->pLinearInfo->pNextEndPoints) {
44!
1533
      code = terrno;
×
1534
      QUERY_CHECK_CODE(code, lino, _end);
×
1535
    }
1536

1537
    for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
667✔
1538
      SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
623✔
1539
      if (pColData == NULL) {
623!
1540
        SPoint dummy = {0};
×
1541
        dummy.val = taosMemoryCalloc(1, 1);
×
1542
        void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &dummy);
×
1543
        QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
×
1544

1545
        dummy.val = taosMemoryCalloc(1, 1);
×
1546
        tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &dummy);
×
1547
        QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
×
1548

1549
        continue;
×
1550
      }
1551
      SPoint value = {0};
623✔
1552
      value.val = taosMemoryCalloc(1, pColData->info.bytes);
623✔
1553
      QUERY_CHECK_NULL(value.val, code, lino, _end, terrno);
623!
1554

1555
      void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
623✔
1556
      QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
623!
1557

1558
      value.val = taosMemoryCalloc(1, pColData->info.bytes);
623✔
1559
      QUERY_CHECK_NULL(value.val, code, lino, _end, terrno);
623!
1560

1561
      tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
623✔
1562
      QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
623!
1563
    }
1564
  }
1565
  pFillInfo->pLinearInfo->winIndex = 0;
238✔
1566

1567
  pFillInfo->pNonFillRow = NULL;
238✔
1568
  pFillInfo->pResRow = NULL;
238✔
1569
  if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F ||
238✔
1570
      pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) {
193✔
1571
    pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData));
105✔
1572
    QUERY_CHECK_NULL(pFillInfo->pResRow, code, lino, _end, terrno);
105!
1573

1574
    pFillInfo->pResRow->key = INT64_MIN;
105✔
1575
    pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
105✔
1576
    if (!pFillInfo->pResRow->pRowVal) {
105!
1577
      code = terrno;
×
1578
      QUERY_CHECK_CODE(code, lino, _end);
×
1579
    }
1580

1581
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
1,627✔
1582
      SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
1,522✔
1583
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, i);
1,522✔
1584
      if (pColData == NULL) {
1,522✔
1585
        pCell->bytes = 1;
2✔
1586
        pCell->type = 4;
2✔
1587
        continue;
2✔
1588
      }
1589
      pCell->bytes = pColData->info.bytes;
1,520✔
1590
      pCell->type = pColData->info.type;
1,520✔
1591
    }
1592

1593
    pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData));
105✔
1594
    QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno);
105!
1595
    pFillInfo->pNonFillRow->key = INT64_MIN;
105✔
1596
    pFillInfo->pNonFillRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
105✔
1597
    memcpy(pFillInfo->pNonFillRow->pRowVal, pFillInfo->pResRow->pRowVal, pFillSup->rowSize);
105✔
1598
  }
1599

1600
  pFillInfo->type = pFillSup->type;
238✔
1601
  pFillInfo->delRanges = taosArrayInit(16, sizeof(STimeRange));
238✔
1602
  if (!pFillInfo->delRanges) {
238!
1603
    code = terrno;
×
1604
    QUERY_CHECK_CODE(code, lino, _end);
×
1605
  }
1606

1607
  pFillInfo->delIndex = 0;
238✔
1608
  pFillInfo->curGroupId = 0;
238✔
1609
  pFillInfo->hasNext = false;
238✔
1610
  return pFillInfo;
238✔
1611

1612
_end:
×
1613
  if (code != TSDB_CODE_SUCCESS) {
×
1614
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1615
  }
1616
  destroyStreamFillInfo(pFillInfo);
×
1617
  return NULL;
×
1618
}
1619

1620
static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
227✔
1621
  if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F) {
227✔
1622
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
673✔
1623
      SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
630✔
1624
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
630✔
1625
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
630✔
1626
      SVariant*        pVar = &(pFillCol->fillVal);
630✔
1627
      if (pCell->type == TSDB_DATA_TYPE_FLOAT) {
630!
1628
        float v = 0;
×
1629
        GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
×
1630
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
×
1631
      } else if (IS_FLOAT_TYPE(pCell->type)) {
838!
1632
        double v = 0;
208✔
1633
        GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
208!
1634
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
208!
1635
      } else if (IS_INTEGER_TYPE(pCell->type)) {
801!
1636
        int64_t v = 0;
379✔
1637
        GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
379!
1638
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
379!
1639
      } else {
1640
        pCell->isNull = true;
43✔
1641
      }
1642
    }
1643
  } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
184✔
1644
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
932✔
1645
      SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
874✔
1646
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
874✔
1647
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
874✔
1648
      pCell->isNull = true;
874✔
1649
    }
1650
  }
1651
}
227✔
1652

1653
int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval, SStreamAggSupporter** ppAggSup) {
227✔
1654
  int32_t code = TSDB_CODE_SUCCESS;
227✔
1655
  int32_t lino = 0;
227✔
1656
  if (IS_NORMAL_INTERVAL_OP(downstream)) {
227✔
1657
    SStreamIntervalOperatorInfo* pInfo = downstream->info;
209✔
1658
    *triggerType = pInfo->twAggSup.calTrigger;
209✔
1659
    *pInterval = pInfo->interval;
209✔
1660
    (*ppAggSup) = NULL;
209✔
1661
  } else if (IS_CONTINUE_INTERVAL_OP(downstream)) {
18!
1662
    SStreamIntervalSliceOperatorInfo* pInfo = downstream->info;
18✔
1663
    *triggerType = pInfo->twAggSup.calTrigger;
18✔
1664
    *pInterval = pInfo->interval;
18✔
1665
    pInfo->hasFill = true;
18✔
1666
    (*ppAggSup) = &pInfo->streamAggSup;
18✔
1667
    pInfo->streamAggSup.stateStore.streamStateSetFillInfo(pInfo->streamAggSup.pState);
18✔
1668
  } else {
1669
    code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1670
  }
1671
  QUERY_CHECK_CODE(code, lino, _end);
227!
1672
  
1673
_end:
227✔
1674
  if (code != TSDB_CODE_SUCCESS) {
227!
1675
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1676
  }
1677
  return code;
227✔
1678
}
1679

1680
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
227✔
1681
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1682
  QRY_PARAM_CHECK(pOptrInfo);
227!
1683

1684
  int32_t                  code = TSDB_CODE_SUCCESS;
227✔
1685
  int32_t                  lino = 0;
227✔
1686
  SStreamFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFillOperatorInfo));
227✔
1687
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
227✔
1688
  if (pInfo == NULL || pOperator == NULL) {
226!
1689
    code = terrno;
×
1690
    QUERY_CHECK_CODE(code, lino, _error);
×
1691
  }
1692

1693
  int32_t    numOfFillCols = 0;
226✔
1694
  SExprInfo* pFillExprInfo = NULL;
226✔
1695

1696
  code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols);
226✔
1697
  QUERY_CHECK_CODE(code, lino, _error);
227!
1698

1699
  code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
227✔
1700
  QUERY_CHECK_CODE(code, lino, _error);
227!
1701

1702
  pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
227✔
1703
  QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
227!
1704

1705
  int8_t triggerType = 0;
227✔
1706
  SInterval interval = {0};
227✔
1707
  code = getDownStreamInfo(downstream, &triggerType, &interval, &pInfo->pStreamAggSup);
227✔
1708
  QUERY_CHECK_CODE(code, lino, _error);
227!
1709

1710
  pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
227✔
1711
                                      pInfo->pSrcBlock);
1712
  if (!pInfo->pFillSup) {
227!
1713
    code = TSDB_CODE_FAILED;
×
1714
    QUERY_CHECK_CODE(code, lino, _error);
×
1715
  }
1716

1717
  initResultSizeInfo(&pOperator->resultInfo, 4096);
227✔
1718
  pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
227✔
1719
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
227!
1720

1721
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
227✔
1722
  QUERY_CHECK_CODE(code, lino, _error);
227!
1723

1724
  code = blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity);
227✔
1725
  QUERY_CHECK_CODE(code, lino, _error);
227!
1726

1727
  pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes);
227✔
1728
  if (!pInfo->pFillInfo) {
227!
1729
    goto _error;
×
1730
  }
1731

1732
  setValueForFillInfo(pInfo->pFillSup, pInfo->pFillInfo);
227✔
1733

1734
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
227✔
1735
  QUERY_CHECK_CODE(code, lino, _error);
227!
1736

1737
  code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity);
227✔
1738
  QUERY_CHECK_CODE(code, lino, _error);
227!
1739

1740
  pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey));
227✔
1741
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno);
227!
1742

1743
  pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY));
227✔
1744
  QUERY_CHECK_NULL(pInfo->pCloseTs, code, lino, _error, terrno);
227!
1745

1746
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
227✔
1747
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
227✔
1748

1749
  int32_t numOfOutputCols = 0;
227✔
1750
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
227✔
1751
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
1752
  QUERY_CHECK_CODE(code, lino, _error);
227!
1753

1754
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
227✔
1755
  QUERY_CHECK_CODE(code, lino, _error);
227!
1756

1757
  pInfo->srcRowIndex = -1;
227✔
1758
  setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
227✔
1759
                  pTaskInfo);
1760

1761
  if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
227✔
1762
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo,
18✔
1763
                                           optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1764
  } else {
1765
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo,
209✔
1766
                                           optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1767
  }
1768
  setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
227✔
1769

1770
  code = appendDownstream(pOperator, &downstream, 1);
227✔
1771
  QUERY_CHECK_CODE(code, lino, _error);
227!
1772

1773
  *pOptrInfo = pOperator;
227✔
1774
  return TSDB_CODE_SUCCESS;
227✔
1775

1776
_error:
×
1777
  qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1778

1779
  if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo);
×
1780
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1781
  pTaskInfo->code = code;
×
1782
  return code;
×
1783
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc