• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4917

07 Jan 2026 03:52PM UTC coverage: 65.42% (+0.02%) from 65.402%
#4917

push

travis-ci

web-flow
merge: from main to 3.0 branch #34204

31 of 34 new or added lines in 2 files covered. (91.18%)

819 existing lines in 129 files now uncovered.

202679 of 309814 relevant lines covered (65.42%)

116724351.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.42
/source/libs/new-stream/src/streamTriggerMerger.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamTriggerMerger.h"
17

18
#include "streamTriggerTask.h"
19
#include "tcompare.h"
20
#include "tdatablock.h"
21

22
typedef struct SSTriggerMetaDataNode {
23
  SSTriggerMetaData *pMeta;
24
  TD_DLIST_NODE(SSTriggerMetaDataNode);
25
} SSTriggerMetaDataNode;
26

27
typedef struct SSTriggerMetaDataList {
28
  TD_DLIST(SSTriggerMetaDataNode);
29

30
  SSDataBlock *pDataBlock;
31
  int32_t      startIdx;
32
  int32_t      endIdx;
33
  int64_t      nextTs;
34
} SSTriggerMetaDataList;
35

36
static int32_t stMergeTreeGetSecondIndex(SMultiwayMergeTreeInfo *pTree, int32_t *pIdx) {
×
37
  *pIdx = tMergeTreeGetAdjustIndex(pTree);
×
38
  if (pTree->totalSources == 2) {
×
39
    return TSDB_CODE_INVALID_PARA;
×
40
  }
41

42
  int32_t   parentId = (*pIdx) >> 1;
×
43
  STreeNode kLeaf = pTree->pNode[parentId];
×
44
  parentId = parentId >> 1;
×
45
  while (parentId > 0) {
×
46
    STreeNode *pCur = &pTree->pNode[parentId];
×
47
    if (pCur->index == -1) {
×
48
      return TSDB_CODE_INVALID_PARA;
×
49
    }
50
    int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param);
×
51
    if (ret < 0) {
×
52
      kLeaf = *pCur;
×
53
    }
54
    parentId = parentId >> 1;
×
55
  }
56
  *pIdx = kLeaf.index;
×
57
  return TSDB_CODE_SUCCESS;
×
58
}
59

60
int32_t stTimestampSorterInit(SSTriggerTimestampSorter *pSorter, SStreamTriggerTask *pTask) {
267,615✔
61
  int32_t code = TSDB_CODE_SUCCESS;
267,615✔
62
  int32_t lino = 0;
267,615✔
63

64
  pSorter->pTask = pTask;
267,615✔
65
  pSorter->readRange = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
267,777✔
66

67
  pSorter->pMetaNodeBuf = taosArrayInit(0, sizeof(SSTriggerMetaDataNode));
267,777✔
68
  QUERY_CHECK_NULL(pSorter->pMetaNodeBuf, code, lino, _end, terrno);
267,777✔
69

70
  pSorter->pMetaLists = taosArrayInit(0, sizeof(SSTriggerMetaDataList));
267,777✔
71
  QUERY_CHECK_NULL(pSorter->pMetaLists, code, lino, _end, terrno);
267,777✔
72

73
  pSorter->pSessWins = taosArrayInit(0, sizeof(STimeWindow));
267,777✔
74
  QUERY_CHECK_NULL(pSorter->pSessWins, code, lino, _end, terrno);
267,777✔
75

76
_end:
267,777✔
77
  if (TSDB_CODE_SUCCESS != code) {
267,777✔
78
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
79
  }
80
  return code;
267,567✔
81
}
82

83
void stTimestampSorterDestroy(void *ptr) {
267,777✔
84
  SSTriggerTimestampSorter **ppSorter = ptr;
267,777✔
85
  if (ppSorter == NULL || *ppSorter == NULL) {
267,777✔
86
    return;
×
87
  }
88

89
  SSTriggerTimestampSorter *pSorter = *ppSorter;
267,777✔
90
  if (pSorter->pMetaNodeBuf != NULL) {
267,777✔
91
    taosArrayDestroy(pSorter->pMetaNodeBuf);
267,777✔
92
    pSorter->pMetaNodeBuf = NULL;
267,777✔
93
  }
94

95
  if (pSorter->pMetaLists != NULL) {
267,777✔
96
    for (int32_t i = 0; i < TARRAY_SIZE(pSorter->pMetaLists); i++) {
296,719✔
97
      SSTriggerMetaDataList *pList = TARRAY_GET_ELEM(pSorter->pMetaLists, i);
28,942✔
98
      if (pList->pDataBlock != NULL) {
28,942✔
99
        blockDataDestroy(pList->pDataBlock);
16,444✔
100
        pList->pDataBlock = NULL;
16,444✔
101
      }
102
    }
103
    taosArrayDestroy(pSorter->pMetaLists);
267,777✔
104
    pSorter->pMetaLists = NULL;
267,777✔
105
  }
106

107
  if (pSorter->pDataMerger != NULL) {
267,777✔
108
    tMergeTreeDestroy(&pSorter->pDataMerger);
39,266✔
109
  }
110

111
  if (pSorter->pSessWins != NULL) {
267,777✔
112
    taosArrayDestroy(pSorter->pSessWins);
267,777✔
113
    pSorter->pSessWins = NULL;
267,777✔
114
  }
115
  taosMemoryFreeClear(*ppSorter);
267,777✔
116
}
117

118
void stTimestampSorterReset(SSTriggerTimestampSorter *pSorter) {
2,442,146✔
119
  if (pSorter == NULL) {
2,442,146✔
120
    return;
×
121
  }
122

123
  pSorter->flags = 0;
2,442,146✔
124
  pSorter->readRange = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
2,442,146✔
125

126
  if (pSorter->pMetaNodeBuf != NULL) {
2,442,146✔
127
    taosArrayClear(pSorter->pMetaNodeBuf);
2,442,146✔
128
  }
129

130
  if (pSorter->pMetaLists != NULL) {
2,442,146✔
131
    for (int32_t i = 0; i < TARRAY_SIZE(pSorter->pMetaLists); i++) {
2,889,843✔
132
      SSTriggerMetaDataList *pList = TARRAY_GET_ELEM(pSorter->pMetaLists, i);
447,697✔
133
      if (pList->pDataBlock != NULL) {
447,697✔
134
        blockDataDestroy(pList->pDataBlock);
356,743✔
135
        pList->pDataBlock = NULL;
356,743✔
136
      }
137
    }
138
    taosArrayClear(pSorter->pMetaLists);
2,442,146✔
139
  }
140

141
  if (pSorter->pSessWins != NULL) {
2,442,146✔
142
    taosArrayClear(pSorter->pSessWins);
2,442,146✔
143
  }
144
}
145

146
int32_t stTimestampSorterSetSortInfo(SSTriggerTimestampSorter *pSorter, STimeWindow *pRange, int64_t tbUid,
558,511✔
147
                                     int32_t tsSlotId) {
148
  int32_t             code = TSDB_CODE_SUCCESS;
558,511✔
149
  int32_t             lino = 0;
558,511✔
150
  SStreamTriggerTask *pTask = pSorter->pTask;
558,511✔
151

152
  QUERY_CHECK_CONDITION(pSorter->flags == 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
558,511✔
153
  QUERY_CHECK_CONDITION(pRange != NULL && pRange->skey <= pRange->ekey, code, lino, _end, TSDB_CODE_INVALID_PARA);
558,511✔
154

155
  pSorter->readRange = *pRange;
558,511✔
156
  pSorter->tbUid = tbUid;
557,780✔
157
  pSorter->tsSlotId = tsSlotId;
558,511✔
158

159
  BIT_FLAG_SET_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_SORT_INFO_SET);
558,511✔
160

161
_end:
558,511✔
162
  if (TSDB_CODE_SUCCESS != code) {
558,511✔
163
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
164
  }
165
  return code;
558,511✔
166
}
167

168
int32_t stTimestampSorterSetMetaDatas(SSTriggerTimestampSorter *pSorter, SSTriggerTableMeta *pTableMeta) {
558,511✔
169
  int32_t             code = TSDB_CODE_SUCCESS;
558,511✔
170
  int32_t             lino = 0;
558,511✔
171
  SStreamTriggerTask *pTask = pSorter->pTask;
558,511✔
172
  SArray             *pMetaNodeBuf = pSorter->pMetaNodeBuf;
558,511✔
173
  SArray             *pMetaLists = pSorter->pMetaLists;
558,511✔
174
  SArray             *pMetas = pTableMeta->pMetas;
558,511✔
175

176
  QUERY_CHECK_CONDITION(pSorter->flags == TRIGGER_TS_SORTER_MASK_SORT_INFO_SET, code, lino, _end,
558,511✔
177
                        TSDB_CODE_INVALID_PARA);
178
  QUERY_CHECK_CONDITION(pMetaNodeBuf != NULL && TARRAY_SIZE(pMetaNodeBuf) == 0, code, lino, _end,
558,511✔
179
                        TSDB_CODE_INVALID_PARA);
180
  QUERY_CHECK_CONDITION(pMetaLists != NULL && TARRAY_SIZE(pMetaLists) == 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
558,511✔
181

182
  int32_t nMetas = taosArrayGetSize(pMetas);
558,511✔
183
  for (int32_t i = 0; i < nMetas; i++) {
1,109,223✔
184
    SSTriggerMetaData *pMeta = TARRAY_GET_ELEM(pMetas, i);
549,981✔
185
    if ((pMeta->skey > pSorter->readRange.ekey) || (pMeta->ekey < pSorter->readRange.skey) ||
550,712✔
186
        IS_TRIGGER_META_DATA_EMPTY(pMeta)) {
487,604✔
187
      continue;
63,108✔
188
    }
189

190
    SSTriggerMetaDataNode *pNode = taosArrayReserve(pMetaNodeBuf, 1);
487,604✔
191
    QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
487,604✔
192
    pNode->pMeta = pMeta;
487,604✔
193
  }
194

195
  nMetas = TARRAY_SIZE(pMetaNodeBuf);
559,242✔
196
  for (int32_t i = 0; i < nMetas; i++) {
1,046,115✔
197
    SSTriggerMetaDataNode *pNode = TARRAY_GET_ELEM(pMetaNodeBuf, i);
487,604✔
198
    SSTriggerMetaDataList *pList = NULL;
486,873✔
199
    for (int32_t j = 0; j < TARRAY_SIZE(pMetaLists); j++) {
486,873✔
200
      SSTriggerMetaDataList *pCurList = TARRAY_GET_ELEM(pMetaLists, j);
10,965✔
201
      if (TD_DLIST_TAIL(pCurList) && TD_DLIST_TAIL(pCurList)->pMeta->ekey < pNode->pMeta->skey) {
10,965✔
202
        pList = pCurList;
10,965✔
203
        break;
10,965✔
204
      }
205
    }
206
    if (pList == NULL) {
487,604✔
207
      pList = taosArrayReserve(pMetaLists, 1);
475,908✔
208
      *pList = (SSTriggerMetaDataList){.nextTs = pNode->pMeta->skey};
476,639✔
209
      QUERY_CHECK_NULL(pList, code, lino, _end, terrno);
476,639✔
210
    }
211
    TD_DLIST_APPEND(pList, pNode);
488,335✔
212
  }
213

214
  BIT_FLAG_SET_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_META_DATA_SET);
558,511✔
215

216
_end:
558,511✔
217
  if (TSDB_CODE_SUCCESS != code) {
558,511✔
218
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
219
  }
220
  return code;
558,511✔
221
}
222

223
int32_t stTimestampSorterSetEmptyMetaDatas(SSTriggerTimestampSorter *pSorter) {
×
224
  int32_t             code = TSDB_CODE_SUCCESS;
×
225
  int32_t             lino = 0;
×
226
  SStreamTriggerTask *pTask = pSorter->pTask;
×
227
  SArray             *pMetaNodeBuf = pSorter->pMetaNodeBuf;
×
228
  SArray             *pMetaLists = pSorter->pMetaLists;
×
229

230
  QUERY_CHECK_CONDITION(pSorter->flags == TRIGGER_TS_SORTER_MASK_SORT_INFO_SET, code, lino, _end,
×
231
                        TSDB_CODE_INVALID_PARA);
232
  QUERY_CHECK_CONDITION(pMetaNodeBuf != NULL && TARRAY_SIZE(pMetaNodeBuf) == 0, code, lino, _end,
×
233
                        TSDB_CODE_INVALID_PARA);
234
  QUERY_CHECK_CONDITION(pMetaLists != NULL && TARRAY_SIZE(pMetaLists) == 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
235

236
  SSTriggerMetaDataList *pList = taosArrayReserve(pMetaLists, 1);
×
237
  QUERY_CHECK_NULL(pList, code, lino, _end, terrno);
×
238
  *pList = (SSTriggerMetaDataList){.nextTs = INT64_MIN};
×
239

240
  BIT_FLAG_SET_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_NO_META_DATA);
×
241

242
_end:
×
243
  if (TSDB_CODE_SUCCESS != code) {
×
244
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
245
  }
246
  return code;
×
247
}
248

249
static FORCE_INLINE void stTimestampSorterMetaListMoveForward(SSTriggerMetaDataList *pList) {
250
  SSTriggerMetaDataNode *pHead = TD_DLIST_HEAD(pList);
111,095✔
251
  if (pHead != NULL) {
111,493✔
252
    TD_DLIST_POP(pList, pHead);
111,493✔
253
  }
254
  if (pList->pDataBlock != NULL) {
111,493✔
255
    blockDataDestroy(pList->pDataBlock);
111,493✔
256
    pList->pDataBlock = NULL;
111,493✔
257
  }
258
  pList->startIdx = pList->endIdx = 0;
111,493✔
259
  pList->nextTs = (TD_DLIST_HEAD(pList) == NULL) ? INT64_MAX : TD_DLIST_HEAD(pList)->pMeta->skey;
111,493✔
260
}
111,493✔
261

262
static int32_t stTimestampSorterMetaListSkip2Ts(SSTriggerTimestampSorter *pSorter, SSTriggerMetaDataList *pList,
481,408✔
263
                                                int64_t ts) {
264
  int32_t             code = TSDB_CODE_SUCCESS;
481,408✔
265
  int32_t             lino = 0;
481,408✔
266
  SStreamTriggerTask *pTask = pSorter->pTask;
481,408✔
267

268
  while (TD_DLIST_HEAD(pList) != NULL) {
483,146✔
269
    if (TD_DLIST_HEAD(pList)->pMeta->ekey >= ts) {
481,408✔
270
      break;  // found the first meta with ekey >= ts
479,670✔
271
    }
272
    stTimestampSorterMetaListMoveForward(pList);
273
  }
274

275
  if (pList->nextTs >= ts) {
481,408✔
276
    goto _end;
125,761✔
277
  }
278

279
  if (pList->pDataBlock == NULL) {
355,647✔
280
    pList->nextTs = ts;
353,623✔
281
  } else {
282
    int32_t          nrows = blockDataGetNumOfRows(pList->pDataBlock);
2,024✔
283
    SColumnInfoData *pTsCol = taosArrayGet(pList->pDataBlock->pDataBlock, pSorter->tsSlotId);
2,024✔
284
    QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
2,024✔
285
    int64_t *pTsData = (int64_t *)pTsCol->pData;
2,024✔
286
    void    *px =
287
        taosbsearch(&ts, pTsData + pList->startIdx, nrows - pList->startIdx, sizeof(int64_t), compareInt64Val, TD_GE);
2,024✔
288
    QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
2,024✔
289
    pList->startIdx = POINTER_DISTANCE(px, pTsData) / sizeof(int64_t);
2,024✔
290
    pList->endIdx = pList->startIdx;
2,024✔
291
    pList->nextTs = *(int64_t *)px;
2,024✔
292
  }
293

294
_end:
481,408✔
295
  if (TSDB_CODE_SUCCESS != code) {
481,408✔
296
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
297
  }
298
  return code;
481,408✔
299
}
300

301
static int32_t stTimestampSorterMetaListCompare(const void *pLeft, const void *pRight, void *param) {
5,729,442✔
302
  int32_t left = *(const int32_t *)pLeft;
5,729,442✔
303
  int32_t right = *(const int32_t *)pRight;
5,729,442✔
304
  SArray *pMetaLists = (SArray *)param;
5,729,442✔
305

306
  if (left < TARRAY_SIZE(pMetaLists) && right < TARRAY_SIZE(pMetaLists)) {
5,729,442✔
307
    SSTriggerMetaDataList *pLeftList = TARRAY_GET_ELEM(pMetaLists, left);
×
308
    SSTriggerMetaDataList *pRightList = TARRAY_GET_ELEM(pMetaLists, right);
×
309

310
    if (pLeftList->nextTs < pRightList->nextTs) {
×
311
      return -1;
×
312
    } else if (pLeftList->nextTs > pRightList->nextTs) {
×
313
      return 1;
×
314
    } else if (pLeftList->nextTs != INT64_MAX) {
×
315
      // sort by version in descending order
316
      int64_t verLeft = TD_DLIST_HEAD(pLeftList)->pMeta->ver;
×
317
      int64_t verRight = TD_DLIST_HEAD(pRightList)->pMeta->ver;
×
318
      if (verLeft < verRight) {
×
319
        return 1;
×
320
      } else if (verLeft > verRight) {
×
321
        return -1;
×
322
      }
323
    }
324
  }
325
  // fallback to index comparison
326
  if (left < right) {
5,729,442✔
327
    return -1;
×
328
  } else if (left > right) {
5,729,442✔
329
    return 1;
5,729,442✔
330
  }
331
  return 0;
×
332
}
333

334
static int32_t stTimestampSorterBuildDataMerger(SSTriggerTimestampSorter *pSorter) {
557,780✔
335
  int32_t             code = TSDB_CODE_SUCCESS;
557,780✔
336
  int32_t             lino = 0;
557,780✔
337
  SStreamTriggerTask *pTask = pSorter->pTask;
557,780✔
338

339
  QUERY_CHECK_CONDITION(BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_META_DATA_SET), code, lino, _end,
557,780✔
340
                        TSDB_CODE_INVALID_PARA);
341
  QUERY_CHECK_CONDITION(!BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_DATA_MERGER_BUILD), code, lino, _end,
557,780✔
342
                        TSDB_CODE_INVALID_PARA);
343

344
  int32_t numList = TARRAY_SIZE(pSorter->pMetaLists);
557,780✔
345
  if (numList == 0) {
557,780✔
346
    SET_TRIGGER_TIMESTAMP_SORTER_EMPTY(pSorter);
81,872✔
347
    goto _end;
81,872✔
348
  }
349

350
  for (int32_t i = 0; i < numList; i++) {
951,816✔
351
    SSTriggerMetaDataList *pList = TARRAY_GET_ELEM(pSorter->pMetaLists, i);
475,908✔
352
    code = stTimestampSorterMetaListSkip2Ts(pSorter, pList, pSorter->readRange.skey);
475,908✔
353
    QUERY_CHECK_CODE(code, lino, _end);
475,908✔
354
  }
355

356
  if (pSorter->pDataMerger && pSorter->pDataMerger->numOfSources < numList) {
475,908✔
357
    // destroy the old merger if it has less sources than needed
358
    tMergeTreeDestroy(&pSorter->pDataMerger);
×
359
  }
360
  if (pSorter->pDataMerger == NULL) {
475,908✔
361
    // round up to the nearest multiple of 8
362
    int32_t capacity = (numList + 7) / 8 * 8;
39,266✔
363
    code = tMergeTreeCreate(&pSorter->pDataMerger, capacity, pSorter->pMetaLists, stTimestampSorterMetaListCompare);
39,266✔
364
    QUERY_CHECK_CODE(code, lino, _end);
39,266✔
365
  } else {
366
    code = tMergeTreeRebuild(pSorter->pDataMerger);
436,642✔
367
    QUERY_CHECK_CODE(code, lino, _end);
436,642✔
368
  }
369

370
_end:
436,642✔
371
  if (TSDB_CODE_SUCCESS != code) {
557,780✔
372
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
373
  } else {
374
    BIT_FLAG_SET_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_DATA_MERGER_BUILD);
557,780✔
375
  }
376
  return code;
557,780✔
377
}
378

379
int32_t stTimestampSorterNextDataBlock(SSTriggerTimestampSorter *pSorter, SSDataBlock **ppDataBlock, int32_t *pStartIdx,
1,355,122✔
380
                                       int32_t *pEndIdx) {
381
  int32_t                code = TSDB_CODE_SUCCESS;
1,355,122✔
382
  int32_t                lino = 0;
1,355,122✔
383
  SStreamTriggerTask    *pTask = pSorter->pTask;
1,355,122✔
384
  SSTriggerMetaDataList *pList = NULL;
1,355,122✔
385

386
  *ppDataBlock = NULL;
1,355,122✔
387
  *pStartIdx = 0;
1,355,122✔
388
  *pEndIdx = 0;
1,355,122✔
389

390
  if (BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_NO_META_DATA)) {
1,355,122✔
391
    pList = TARRAY_DATA(pSorter->pMetaLists);
×
392
    if (pList->pDataBlock != NULL && pList->startIdx < pList->endIdx) {
×
393
      int32_t          nrows = blockDataGetNumOfRows(pList->pDataBlock);
×
394
      SColumnInfoData *pTsCol = taosArrayGet(pList->pDataBlock->pDataBlock, pSorter->tsSlotId);
×
395
      QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
×
396
      int64_t *pTsData = (int64_t *)pTsCol->pData;
×
397
      pList->startIdx = pList->endIdx;
×
398
      if (pList->startIdx < nrows) {
×
399
        pList->nextTs = pTsData[pList->startIdx];
×
400
      } else {
401
        pList->nextTs = pTsData[nrows - 1] + 1;
×
402
        blockDataDestroy(pList->pDataBlock);
×
403
        pList->startIdx = pList->endIdx = 0;
×
404
      }
405
    }
406
    if (pList->pDataBlock != NULL) {
×
407
      int32_t nrows = blockDataGetNumOfRows(pList->pDataBlock);
×
408
      pList->endIdx = nrows;
×
409
      *ppDataBlock = pList->pDataBlock;
×
410
      *pStartIdx = pList->startIdx;
×
411
      *pEndIdx = pList->endIdx;
×
412
    } else {
413
      // need to fetch new data block
414
    }
415
    goto _end;
×
416
  }
417

418
  if (!BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_DATA_MERGER_BUILD)) {
1,355,122✔
419
    code = stTimestampSorterBuildDataMerger(pSorter);
557,780✔
420
    QUERY_CHECK_CODE(code, lino, _end);
557,780✔
421
  }
422

423
  if (IS_TRIGGER_TIMESTAMP_SORTER_EMPTY(pSorter)) {
1,355,122✔
424
    goto _end;
83,610✔
425
  } else {
426
    pList = TARRAY_GET_ELEM(pSorter->pMetaLists, tMergeTreeGetChosenIndex(pSorter->pDataMerger));
1,271,512✔
427
    if (pList->pDataBlock != NULL && pList->startIdx < pList->endIdx) {
1,271,512✔
428
      int32_t          nrows = blockDataGetNumOfRows(pList->pDataBlock);
309,182✔
429
      SColumnInfoData *pTsCol = taosArrayGet(pList->pDataBlock->pDataBlock, pSorter->tsSlotId);
309,182✔
430
      QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
309,182✔
431
      int64_t *pTsData = (int64_t *)pTsCol->pData;
309,182✔
432
      // update read progress
433
      pSorter->readRange.skey = TMAX(pSorter->readRange.skey, pTsData[pList->endIdx - 1] + 1);
309,182✔
434
      // move forward to next block range
435
      pList->startIdx = pList->endIdx;
309,182✔
436
      if (pList->startIdx < nrows) {
309,182✔
437
        pList->nextTs = pTsData[pList->startIdx];
199,825✔
438
      } else {
439
        stTimestampSorterMetaListMoveForward(pList);
440
      }
441
      code = tMergeTreeAdjust(pSorter->pDataMerger, tMergeTreeGetAdjustIndex(pSorter->pDataMerger));
309,182✔
442
      QUERY_CHECK_CODE(code, lino, _end);
309,182✔
443
    }
444
  }
445

446
  while (!IS_TRIGGER_TIMESTAMP_SORTER_EMPTY(pSorter)) {
1,575,345✔
447
    pList = TARRAY_GET_ELEM(pSorter->pMetaLists, tMergeTreeGetChosenIndex(pSorter->pDataMerger));
1,277,012✔
448
    if (pList->nextTs > pSorter->readRange.ekey) {
1,277,012✔
449
      SET_TRIGGER_TIMESTAMP_SORTER_EMPTY(pSorter);
298,333✔
450
      continue;
298,333✔
451
    }
452

453
    if (pList->nextTs < pSorter->readRange.skey) {
978,679✔
454
      code = stTimestampSorterMetaListSkip2Ts(pSorter, pList, pSorter->readRange.skey);
5,500✔
455
      QUERY_CHECK_CODE(code, lino, _end);
5,500✔
456
      code = tMergeTreeAdjust(pSorter->pDataMerger, tMergeTreeGetAdjustIndex(pSorter->pDataMerger));
5,500✔
457
      QUERY_CHECK_CODE(code, lino, _end);
5,500✔
458
      continue;
5,500✔
459
    }
460

461
    int64_t endTime = pSorter->readRange.ekey;
973,179✔
462
    if (TARRAY_SIZE(pSorter->pMetaLists) > 1) {
973,179✔
463
      int32_t idx2 = 0;
×
464
      code = stMergeTreeGetSecondIndex(pSorter->pDataMerger, &idx2);
×
465
      SSTriggerMetaDataList *pList2 = TARRAY_GET_ELEM(pSorter->pMetaLists, idx2);
×
466
      if (pList->nextTs == pList2->nextTs) {
×
467
        endTime = TMIN(pList->nextTs, endTime);
×
468
      } else {
469
        endTime = TMIN(pList2->nextTs - 1, endTime);
×
470
      }
471
    }
472
    QUERY_CHECK_CONDITION(endTime >= pSorter->readRange.skey, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
973,179✔
473

474
    if (pList->pDataBlock != NULL) {
973,179✔
475
      int32_t          nrows = blockDataGetNumOfRows(pList->pDataBlock);
486,306✔
476
      SColumnInfoData *pTsCol = taosArrayGet(pList->pDataBlock->pDataBlock, pSorter->tsSlotId);
486,306✔
477
      QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
486,306✔
478
      int64_t *pTsData = (int64_t *)pTsCol->pData;
486,306✔
479
      void    *px = taosbsearch(&endTime, pTsData + pList->startIdx, nrows - pList->startIdx, sizeof(int64_t),
486,306✔
480
                                compareInt64Val, TD_GT);
481
      pList->endIdx = (px != NULL) ? (POINTER_DISTANCE(px, pTsData) / sizeof(int64_t)) : nrows;
486,306✔
482
      *ppDataBlock = pList->pDataBlock;
486,306✔
483
      *pStartIdx = pList->startIdx;
486,306✔
484
      *pEndIdx = pList->endIdx;
486,306✔
485
    } else {
486
      // need to fetch data block
487
    }
488
    break;
973,179✔
489
  }
490

491
_end:
298,333✔
492
  if (TSDB_CODE_SUCCESS != code) {
1,355,122✔
493
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
494
  }
495
  return code;
1,355,122✔
496
}
497

498
int32_t stTimestampSorterForwardNrows(SSTriggerTimestampSorter *pSorter, int64_t nrowsToSkip, int64_t *pSkipped,
×
499
                                      int64_t *pLastTs) {
500
  int32_t                code = TSDB_CODE_SUCCESS;
×
501
  int32_t                lino = 0;
×
502
  SStreamTriggerTask    *pTask = pSorter->pTask;
×
503
  SSTriggerMetaDataList *pList = NULL;
×
504
  int64_t                skipped = 0;
×
505
  int64_t                lastTs = INT64_MIN;
×
506

507
  while (skipped < nrowsToSkip) {
×
508
    SSDataBlock *pDataBlock = NULL;
×
509
    int32_t      startIdx = 0;
×
510
    int32_t      endIdx = 0;
×
511
    code = stTimestampSorterNextDataBlock(pSorter, &pDataBlock, &startIdx, &endIdx);
×
512
    QUERY_CHECK_CODE(code, lino, _end);
×
513

514
    if (pDataBlock != NULL) {
×
515
      // update skipped rows
516
      SColumnInfoData *pTsCol = taosArrayGet(pDataBlock->pDataBlock, pSorter->tsSlotId);
×
517
      QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
×
518
      int64_t *pTsData = (int64_t *)pTsCol->pData;
×
519
      int32_t  stepSkipped = TMIN(endIdx - startIdx, nrowsToSkip - skipped);
×
520
      skipped += stepSkipped;
×
521
      lastTs = pTsData[startIdx + stepSkipped - 1];
×
522

523
      // shrink the data block range to only contain the skipped rows
524
      pList = TARRAY_GET_ELEM(pSorter->pMetaLists, tMergeTreeGetChosenIndex(pSorter->pDataMerger));
×
525
      QUERY_CHECK_CONDITION(pList->pDataBlock == pDataBlock, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
526
      pList->endIdx = startIdx + stepSkipped;
×
527
    } else if (!IS_TRIGGER_TIMESTAMP_SORTER_EMPTY(pSorter)) {
×
528
      pList = TARRAY_GET_ELEM(pSorter->pMetaLists, tMergeTreeGetChosenIndex(pSorter->pDataMerger));
×
529
      // try to use metadata to skip rows
530
      int64_t endTime = pSorter->readRange.ekey;
×
531
      if (TARRAY_SIZE(pSorter->pMetaLists) > 1) {
×
532
        int32_t idx2 = 0;
×
533
        code = stMergeTreeGetSecondIndex(pSorter->pDataMerger, &idx2);
×
534
        SSTriggerMetaDataList *pList2 = TARRAY_GET_ELEM(pSorter->pMetaLists, idx2);
×
535
        if (pList->nextTs == pList2->nextTs) {
×
536
          endTime = TMIN(pList->nextTs, endTime);
×
537
        } else {
538
          endTime = TMIN(pList2->nextTs - 1, endTime);
×
539
        }
540
      }
541
      QUERY_CHECK_CONDITION(endTime >= pSorter->readRange.skey, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
542

543
      SSTriggerMetaData *pMeta = TD_DLIST_HEAD(pList)->pMeta;
×
544
      if (!IS_TRIGGER_META_NROW_INACCURATE(pMeta) && (pMeta->skey >= pList->nextTs) && (pMeta->ekey <= endTime) &&
×
545
          (skipped + pMeta->nrows <= nrowsToSkip)) {
×
546
        // update skipped rows
547
        skipped += pMeta->nrows;
×
548
        lastTs = pMeta->ekey;
×
549
        // update read progress
550
        pSorter->readRange.skey = TMAX(pSorter->readRange.skey, pMeta->ekey + 1);
×
551
        // move forward to next meta
552
        stTimestampSorterMetaListMoveForward(pList);
553
        code = tMergeTreeAdjust(pSorter->pDataMerger, tMergeTreeGetAdjustIndex(pSorter->pDataMerger));
×
554
        QUERY_CHECK_CODE(code, lino, _end);
×
555
      } else {
556
        // need to fetch data block
557
        break;
558
      }
559
    } else {
560
      // no more data
561
      break;
×
562
    }
563
  }
564

565
_end:
×
566
  if (TSDB_CODE_SUCCESS != code) {
×
567
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
568
  } else {
569
    if (pSkipped != NULL) *pSkipped = skipped;
×
570
    if (pLastTs != NULL) *pLastTs = lastTs;
×
571
  }
572
  return code;
×
573
}
574

575
static int32_t stTimestampSorterWindowReverseCompare(const void *pLeft, const void *pRight) {
3,476✔
576
  STimeWindow *pLeftWin = (STimeWindow *)pLeft;
3,476✔
577
  STimeWindow *pRightWin = (STimeWindow *)pRight;
3,476✔
578
  if (pLeftWin->ekey < pRightWin->ekey) {
3,476✔
579
    return 1;
3,476✔
580
  } else if (pLeftWin->ekey > pRightWin->ekey) {
×
581
    return -1;
×
582
  } else if (pLeftWin->skey < pRightWin->skey) {
×
583
    return 1;
×
584
  } else if (pLeftWin->skey > pRightWin->skey) {
×
585
    return -1;
×
586
  }
587
  return 0;
×
588
}
589

590
static int32_t stTimestampSorterBuildSessWin(SSTriggerTimestampSorter *pSorter, int64_t gap) {
1,738✔
591
  int32_t             code = TSDB_CODE_SUCCESS;
1,738✔
592
  int32_t             lino = 0;
1,738✔
593
  SStreamTriggerTask *pTask = pSorter->pTask;
1,738✔
594
  SArray             *pSessWins = pSorter->pSessWins;
1,738✔
595

596
  QUERY_CHECK_CONDITION(BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_META_DATA_SET), code, lino, _end,
1,738✔
597
                        TSDB_CODE_INVALID_PARA);
598
  QUERY_CHECK_CONDITION(!BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_SESS_WIN_BUILD), code, lino, _end,
1,738✔
599
                        TSDB_CODE_INVALID_PARA);
600
  QUERY_CHECK_CONDITION(pSessWins != NULL && TARRAY_SIZE(pSessWins) == 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,738✔
601

602
  int32_t numList = TARRAY_SIZE(pSorter->pMetaLists);
1,738✔
603
  for (int32_t i = 0; i < numList; i++) {
3,476✔
604
    SSTriggerMetaDataList *pList = TARRAY_GET_ELEM(pSorter->pMetaLists, i);
1,738✔
605
    STimeWindow           *pLastWin = NULL;
1,738✔
606
    for (SSTriggerMetaDataNode *pNode = TD_DLIST_HEAD(pList); pNode != NULL; pNode = TD_DLIST_NODE_NEXT(pNode)) {
3,476✔
607
      if (!IS_TRIGGER_META_SKEY_INACCURATE(pNode->pMeta) && (pNode->pMeta->skey >= pSorter->readRange.skey) &&
1,738✔
608
          (pNode->pMeta->skey <= pSorter->readRange.ekey)) {
1,738✔
609
        int64_t ts = pNode->pMeta->skey;
1,738✔
610
        if (pLastWin != NULL && pLastWin->ekey + gap >= ts) {
1,738✔
UNCOV
611
          pLastWin->ekey = TMAX(pLastWin->ekey, ts);
×
612
        } else {
613
          pLastWin = taosArrayReserve(pSorter->pSessWins, 1);
1,738✔
614
          *pLastWin = (STimeWindow){.skey = ts, .ekey = ts};
1,738✔
615
        }
616
      }
617
      if (!IS_TRIGGER_META_EKEY_INACCURATE(pNode->pMeta) && (pNode->pMeta->ekey >= pSorter->readRange.skey) &&
1,738✔
618
          (pNode->pMeta->ekey <= pSorter->readRange.ekey)) {
1,738✔
619
        int64_t ts = pNode->pMeta->ekey;
1,738✔
620
        if (pLastWin != NULL && pLastWin->ekey + gap >= ts) {
1,738✔
621
          pLastWin->ekey = TMAX(pLastWin->ekey, ts);
×
622
        } else {
623
          pLastWin = taosArrayReserve(pSorter->pSessWins, 1);
1,738✔
624
          *pLastWin = (STimeWindow){.skey = ts, .ekey = ts};
1,738✔
625
        }
626
      }
627
    }
628
  }
629

630
  int32_t numWins = TARRAY_SIZE(pSorter->pSessWins);
1,738✔
631
  if (numWins == 0) {
1,738✔
632
    goto _end;
×
633
  }
634

635
  taosArraySort(pSorter->pSessWins, stTimestampSorterWindowReverseCompare);
1,738✔
636
  STimeWindow *pWin = TARRAY_GET_ELEM(pSorter->pSessWins, 0);
1,738✔
637
  for (int32_t i = 1; i < numWins; i++) {
3,476✔
638
    STimeWindow *pCurWin = TARRAY_GET_ELEM(pSorter->pSessWins, i);
1,738✔
639
    if (pCurWin->ekey + gap >= pWin->skey) {
1,738✔
640
      pWin->skey = TMIN(pWin->skey, pCurWin->skey);
×
641
    } else {
642
      ++pWin;
1,738✔
643
      *pWin = *pCurWin;
1,738✔
644
    }
645
  }
646
  TARRAY_SIZE(pSorter->pSessWins) = TARRAY_ELEM_IDX(pSorter->pSessWins, pWin) + 1;
1,738✔
647

648
_end:
1,738✔
649
  if (TSDB_CODE_SUCCESS != code) {
1,738✔
650
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
651
  } else {
652
    BIT_FLAG_SET_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_SESS_WIN_BUILD);
1,738✔
653
  }
654
  return code;
1,738✔
655
}
656

657
int32_t stTimestampSorterForwardTs(SSTriggerTimestampSorter *pSorter, int64_t ts, int64_t gap, int64_t *pLastTs,
8,370✔
658
                                   int64_t *pNextTs) {
659
  int32_t                code = TSDB_CODE_SUCCESS;
8,370✔
660
  int32_t                lino = 0;
8,370✔
661
  SStreamTriggerTask    *pTask = pSorter->pTask;
8,370✔
662
  SSTriggerMetaDataList *pList = NULL;
8,370✔
663
  int64_t                nextTs = INT64_MAX;
8,370✔
664

665
  if (!BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_SESS_WIN_BUILD)) {
8,370✔
666
    code = stTimestampSorterBuildSessWin(pSorter, gap);
1,738✔
667
    QUERY_CHECK_CODE(code, lino, _end);
1,738✔
668
  }
669

670
  while (true) {
808✔
671
    nextTs = INT64_MAX;
9,178✔
672
    // forward ts using session windows generated from metadata
673
    while (TARRAY_SIZE(pSorter->pSessWins) > 0) {
12,654✔
674
      STimeWindow *pWin = taosArrayPop(pSorter->pSessWins);
10,916✔
675
      if (ts + gap >= pWin->skey) {
10,916✔
676
        ts = TMAX(pWin->ekey, ts);
3,476✔
677
      } else {
678
        // push back the window
679
        nextTs = pWin->skey;
7,440✔
680
        TARRAY_SIZE(pSorter->pSessWins)++;
7,440✔
681
        break;
7,440✔
682
      }
683
    }
684

685
    // try to read data between ts and nextTs
686
    int64_t savedEkey = pSorter->readRange.ekey;
9,178✔
687
    pSorter->readRange.skey = TMAX(pSorter->readRange.skey, ts + 1);
9,178✔
688
    pSorter->readRange.ekey = TMIN(pSorter->readRange.ekey, nextTs - 1);
9,178✔
689
    SSDataBlock *pDataBlock = NULL;
9,178✔
690
    int32_t      startIdx = 0;
9,178✔
691
    int32_t      endIdx = 0;
9,178✔
692
    code = stTimestampSorterNextDataBlock(pSorter, &pDataBlock, &startIdx, &endIdx);
9,178✔
693
    bool needFetch = (pDataBlock == NULL) && !IS_TRIGGER_TIMESTAMP_SORTER_EMPTY(pSorter);
9,178✔
694
    pSorter->readRange.ekey = savedEkey;
9,178✔
695
    QUERY_CHECK_CODE(code, lino, _end);
9,178✔
696

697
    if (pDataBlock != NULL) {
9,178✔
698
      // forward ts using data block
699
      SColumnInfoData *pTsCol = taosArrayGet(pDataBlock->pDataBlock, pSorter->tsSlotId);
3,762✔
700
      QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
3,762✔
701
      int64_t *pTsData = (int64_t *)pTsCol->pData;
3,762✔
702
      while (startIdx < endIdx) {
7,600✔
703
        if (ts + gap >= pTsData[startIdx]) {
6,792✔
704
          ts = TMAX(ts, pTsData[startIdx]);
3,838✔
705
          startIdx++;
3,838✔
706
        } else {
707
          nextTs = pTsData[startIdx];
2,954✔
708
          break;
2,954✔
709
        }
710
      }
711

712
      if (startIdx < endIdx) {
3,762✔
713
        // shrink the data block range to only contain the checked rows
714
        pList = TARRAY_GET_ELEM(pSorter->pMetaLists, tMergeTreeGetChosenIndex(pSorter->pDataMerger));
2,954✔
715
        QUERY_CHECK_CONDITION(pList->pDataBlock == pDataBlock, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
2,954✔
716
        pList->endIdx = startIdx;
2,954✔
717

718
        // find time diff between two consecutive rows larger than gap
719
        break;
2,954✔
720
      }
721
    } else if (needFetch) {
5,416✔
722
      // need to fetch data block
723
      nextTs = INT64_MAX;
1,738✔
724
      break;
1,738✔
725
    } else {
726
      // no more data
727
      break;
3,678✔
728
    }
729
  }
730

731
_end:
8,370✔
732
  if (TSDB_CODE_SUCCESS != code) {
8,370✔
733
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
734
  } else {
735
    if (pLastTs != NULL) *pLastTs = ts;
8,370✔
736
    if (pNextTs != NULL) *pNextTs = nextTs;
8,370✔
737
  }
738
  return code;
8,370✔
739
}
740

741
int32_t stTimestampSorterGetMetaToFetch(SSTriggerTimestampSorter *pSorter, SSTriggerMetaData **ppMeta) {
486,873✔
742
  int32_t             code = TSDB_CODE_SUCCESS;
486,873✔
743
  int32_t             lino = 0;
486,873✔
744
  SStreamTriggerTask *pTask = pSorter->pTask;
486,873✔
745

746
  *ppMeta = NULL;
486,873✔
747

748
  if (BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_NO_META_DATA) ||
486,873✔
749
      IS_TRIGGER_TIMESTAMP_SORTER_EMPTY(pSorter)) {
486,873✔
750
    goto _end;
×
751
  }
752

753
  QUERY_CHECK_CONDITION(BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_DATA_MERGER_BUILD), code, lino, _end,
486,873✔
754
                        TSDB_CODE_INVALID_PARA);
755
  SSTriggerMetaDataList *pList = TARRAY_GET_ELEM(pSorter->pMetaLists, tMergeTreeGetChosenIndex(pSorter->pDataMerger));
486,873✔
756
  if (pList->pDataBlock == NULL && TD_DLIST_HEAD(pList) != NULL) {
486,873✔
757
    *ppMeta = TD_DLIST_HEAD(pList)->pMeta;
486,873✔
758
  }
759

760
_end:
×
761
  if (TSDB_CODE_SUCCESS != code) {
486,873✔
762
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
763
  }
764
  return code;
486,873✔
765
}
766

767
int32_t stTimestampSorterBindDataBlock(SSTriggerTimestampSorter *pSorter, SSDataBlock **ppDataBlock) {
484,680✔
768
  int32_t                code = TSDB_CODE_SUCCESS;
484,680✔
769
  int32_t                lino = 0;
484,680✔
770
  SStreamTriggerTask    *pTask = pSorter->pTask;
484,680✔
771
  SSTriggerMetaDataList *pList = NULL;
484,680✔
772

773
  if (BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_NO_META_DATA)) {
484,680✔
774
    SColumnInfoData *pTsCol = taosArrayGet((*ppDataBlock)->pDataBlock, pSorter->tsSlotId);
×
775
    QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
×
776
    int64_t *pTsData = (int64_t *)pTsCol->pData;
×
777

778
    pList = TARRAY_DATA(pSorter->pMetaLists);
×
779
    QUERY_CHECK_CONDITION(pList->pDataBlock == NULL, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
780
    pList->pDataBlock = *ppDataBlock;
×
781
    pList->startIdx = pList->endIdx = 0;
×
782
    pList->nextTs = pTsData[0];
×
783
    goto _end;
×
784
  }
785

786
  QUERY_CHECK_CONDITION(!IS_TRIGGER_TIMESTAMP_SORTER_EMPTY(pSorter), code, lino, _end, TSDB_CODE_INVALID_PARA);
484,680✔
787
  QUERY_CHECK_CONDITION(BIT_FLAG_TEST_MASK(pSorter->flags, TRIGGER_TS_SORTER_MASK_DATA_MERGER_BUILD), code, lino, _end,
484,680✔
788
                        TSDB_CODE_INVALID_PARA);
789

790
  pList = TARRAY_GET_ELEM(pSorter->pMetaLists, tMergeTreeGetChosenIndex(pSorter->pDataMerger));
484,680✔
791
  QUERY_CHECK_CONDITION(pList->pDataBlock == NULL, code, lino, _end, TSDB_CODE_INVALID_PARA);
484,680✔
792
  QUERY_CHECK_NULL(TD_DLIST_HEAD(pList), code, lino, _end, TSDB_CODE_INVALID_PARA);
484,680✔
793
  pList->pDataBlock = *ppDataBlock;
484,680✔
794
  *ppDataBlock = NULL;
484,680✔
795

796
  SSTriggerMetaData *pMeta = TD_DLIST_HEAD(pList)->pMeta;
484,680✔
797
  int32_t            nrows = blockDataGetNumOfRows(pList->pDataBlock);
484,680✔
798
  if (nrows <= 0) {
484,680✔
799
    SET_TRIGGER_META_DATA_EMPTY(pMeta);
398✔
800
    stTimestampSorterMetaListMoveForward(pList);
801
  } else {
802
    SColumnInfoData *pTsCol = taosArrayGet(pList->pDataBlock->pDataBlock, pSorter->tsSlotId);
484,282✔
803
    QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
484,282✔
804
    int64_t *pTsData = (int64_t *)pTsCol->pData;
484,282✔
805
    // update meta info, which may help with subsequent data merging
806
    int64_t skey = pTsData[0];
484,282✔
807
    int64_t ekey = pTsData[nrows - 1];
484,282✔
808
    QUERY_CHECK_CONDITION(skey >= pMeta->skey && ekey <= pMeta->ekey, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
484,282✔
809
    pMeta->skey = skey;
484,282✔
810
    pMeta->ekey = ekey;
484,282✔
811
    pMeta->nrows = nrows;
484,282✔
812

813
    void *px = taosbsearch(&pList->nextTs, pTsData, nrows, sizeof(int64_t), compareInt64Val, TD_GE);
484,282✔
814
    if (px == NULL) {
484,282✔
815
      stTimestampSorterMetaListMoveForward(pList);
816
    } else {
817
      pList->startIdx = POINTER_DISTANCE(px, pTsData) / sizeof(int64_t);
484,282✔
818
      pList->endIdx = pList->startIdx;
484,282✔
819
      pList->nextTs = *(int64_t *)px;
484,282✔
820
    }
821
  }
822
  code = tMergeTreeAdjust(pSorter->pDataMerger, tMergeTreeGetAdjustIndex(pSorter->pDataMerger));
484,680✔
823
  QUERY_CHECK_CODE(code, lino, _end);
484,680✔
824

825
_end:
484,680✔
826
  if (TSDB_CODE_SUCCESS != code) {
484,680✔
827
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
828
  }
829
  return code;
484,680✔
830
}
831

832
typedef struct SVtableMergerReaderInfo {
833
  SSTriggerTableColRef *pColRef;
834
  SSDataBlock          *pDataBlock;
835
  int32_t               startIdx;
836
  int32_t               endIdx;
837
  int64_t               nextTs;
838
} SVtableMergerReaderInfo;
839

840
static int32_t stVtableMergerReaderInfoCompare(const void *pLeft, const void *pRight, void *param) {
7,829,890✔
841
  int32_t left = *(const int32_t *)pLeft;
7,829,890✔
842
  int32_t right = *(const int32_t *)pRight;
7,829,890✔
843
  SArray *pReaderInfos = (SArray *)param;
7,829,890✔
844

845
  if (left < TARRAY_SIZE(pReaderInfos) && right < TARRAY_SIZE(pReaderInfos)) {
7,829,890✔
846
    SVtableMergerReaderInfo *pLeftReaderInfo = TARRAY_GET_ELEM(pReaderInfos, left);
×
847
    SVtableMergerReaderInfo *pRightReaderInfo = TARRAY_GET_ELEM(pReaderInfos, right);
×
848

849
    if (pLeftReaderInfo->nextTs < pRightReaderInfo->nextTs) {
×
850
      return -1;
×
851
    } else if (pLeftReaderInfo->nextTs > pRightReaderInfo->nextTs) {
×
852
      return 1;
×
853
    }
854
  }
855
  // fallback to index comparison
856
  if (left < right) {
7,829,890✔
857
    return -1;
×
858
  } else if (left > right) {
7,829,890✔
859
    return 1;
7,829,890✔
860
  }
861
  return 0;
×
862
}
863

864
int32_t stVtableMergerInit(SSTriggerVtableMerger *pMerger, struct SStreamTriggerTask *pTask, SSDataBlock **ppDataBlock,
89,206✔
865
                           SFilterInfo **ppFilter, int32_t nVirDataCols) {
866
  int32_t code = TSDB_CODE_SUCCESS;
89,206✔
867
  int32_t lino = 0;
89,206✔
868

869
  pMerger->pTask = pTask;
89,206✔
870
  pMerger->pDataBlock = *ppDataBlock;
89,206✔
871
  *ppDataBlock = NULL;
89,206✔
872
  pMerger->pFilter = *ppFilter;
89,206✔
873
  *ppFilter = NULL;
89,206✔
874
  pMerger->nVirDataCols = nVirDataCols;
89,206✔
875
  pMerger->readRange = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
89,206✔
876

877
  pMerger->pReaderInfos = taosArrayInit(0, sizeof(SVtableMergerReaderInfo));
89,206✔
878
  QUERY_CHECK_NULL(pMerger->pReaderInfos, code, lino, _end, terrno);
89,206✔
879

880
  pMerger->pReaders = taosArrayInit(0, sizeof(SSTriggerTimestampSorter *));
89,206✔
881
  QUERY_CHECK_NULL(pMerger->pReaders, code, lino, _end, terrno);
89,206✔
882

883
  if (pMerger->pFilter != NULL) {
89,206✔
884
    SFilterColumnParam param = {.numOfCols = taosArrayGetSize(pMerger->pDataBlock->pDataBlock),
10,234✔
885
                                .pDataBlock = pMerger->pDataBlock->pDataBlock};
10,234✔
886
    code = filterSetDataFromSlotId(pMerger->pFilter, &param);
10,234✔
887
    QUERY_CHECK_CODE(code, lino, _end);
10,234✔
888
  }
889

890
_end:
78,972✔
891
  if (TSDB_CODE_SUCCESS != code) {
89,206✔
892
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
893
  }
894
  return code;
89,206✔
895
}
896

897
void stVtableMergerDestroy(void *ptr) {
89,206✔
898
  SSTriggerVtableMerger **ppMerger = ptr;
89,206✔
899
  if (ppMerger == NULL || *ppMerger == NULL) {
89,206✔
900
    return;
×
901
  }
902

903
  SSTriggerVtableMerger *pMerger = *ppMerger;
89,206✔
904
  if (pMerger->pDataBlock != NULL) {
89,206✔
905
    blockDataDestroy(pMerger->pDataBlock);
89,206✔
906
    pMerger->pDataBlock = NULL;
89,206✔
907
  }
908

909
  if (pMerger->pFilter != NULL) {
89,206✔
910
    filterFreeInfo(pMerger->pFilter);
10,234✔
911
    pMerger->pFilter = NULL;
10,234✔
912
  }
913

914
  if (pMerger->pReaderInfos != NULL) {
89,206✔
915
    taosArrayDestroy(pMerger->pReaderInfos);
89,206✔
916
    pMerger->pReaderInfos = NULL;
89,206✔
917
  }
918
  if (pMerger->pPseudoCols != NULL) {
89,206✔
919
    blockDataDestroy(pMerger->pPseudoCols);
2,193✔
920
    pMerger->pPseudoCols = NULL;
2,193✔
921
  }
922

923
  if (pMerger->pReaders != NULL) {
89,206✔
924
    taosArrayDestroyEx(pMerger->pReaders, stTimestampSorterDestroy);
89,206✔
925
    pMerger->pReaders = NULL;
89,206✔
926
  }
927

928
  if (pMerger->pDataMerger != NULL) {
89,206✔
929
    tMergeTreeDestroy(&pMerger->pDataMerger);
31,866✔
930
    pMerger->pDataMerger = NULL;
31,866✔
931
  }
932
  taosMemFreeClear(*ppMerger);
89,206✔
933
}
934

935
void stVtableMergerReset(SSTriggerVtableMerger *pMerger) {
2,868,403✔
936
  if (pMerger == NULL) {
2,868,403✔
937
    return;
1,463,755✔
938
  }
939

940
  pMerger->flags = 0;
1,404,648✔
941
  pMerger->readRange = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN};
1,404,648✔
942
  blockDataEmpty(pMerger->pDataBlock);
1,404,648✔
943

944
  if (pMerger->pReaderInfos != NULL) {
1,404,648✔
945
    taosArrayClear(pMerger->pReaderInfos);
1,404,648✔
946
  }
947
  if (pMerger->pPseudoCols != NULL) {
1,404,648✔
948
    blockDataDestroy(pMerger->pPseudoCols);
547,601✔
949
    pMerger->pPseudoCols = NULL;
547,601✔
950
  }
951
}
952

953
int32_t stVtableMergerSetMergeInfo(SSTriggerVtableMerger *pMerger, STimeWindow *pRange, SArray *pTableColRefs) {
550,525✔
954
  int32_t             code = TSDB_CODE_SUCCESS;
550,525✔
955
  int32_t             lino = 0;
550,525✔
956
  SStreamTriggerTask *pTask = pMerger->pTask;
550,525✔
957
  SArray             *pReaderInfos = pMerger->pReaderInfos;
550,525✔
958
  SArray             *pReaders = pMerger->pReaders;
550,525✔
959

960
  QUERY_CHECK_CONDITION(pMerger->flags == 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
550,525✔
961
  QUERY_CHECK_CONDITION(pRange != NULL && pRange->skey <= pRange->ekey, code, lino, _end, TSDB_CODE_INVALID_PARA);
550,525✔
962
  QUERY_CHECK_CONDITION(pReaderInfos != NULL && TARRAY_SIZE(pReaderInfos) == 0, code, lino, _end,
550,525✔
963
                        TSDB_CODE_INVALID_PARA);
964
  QUERY_CHECK_CONDITION(pReaders != NULL && TARRAY_SIZE(pReaders) >= 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
550,525✔
965

966
  pMerger->readRange = *pRange;
550,525✔
967
  int32_t nTables = taosArrayGetSize(pTableColRefs);
550,525✔
968

969
  if (nTables == 0) {
550,525✔
970
    SET_TRIGGER_VTABLE_MERGER_EMPTY(pMerger);
×
971
    goto _end;
×
972
  }
973

974
  for (int32_t i = 0; i < nTables; i++) {
1,101,050✔
975
    SVtableMergerReaderInfo *pReaderInfo = taosArrayReserve(pReaderInfos, 1);
550,525✔
976
    QUERY_CHECK_NULL(pReaderInfo, code, lino, _end, terrno);
550,525✔
977
    pReaderInfo->pColRef = TARRAY_GET_ELEM(pTableColRefs, i);
550,525✔
978
    pReaderInfo->pDataBlock = NULL;
550,525✔
979
    pReaderInfo->startIdx = pReaderInfo->endIdx = 0;
550,525✔
980
    pReaderInfo->nextTs = pRange->skey;
550,525✔
981

982
    SSTriggerTimestampSorter *pReader = NULL;
550,525✔
983
    if (i < TARRAY_SIZE(pReaders)) {
550,525✔
984
      pReader = *(SSTriggerTimestampSorter **)TARRAY_GET_ELEM(pReaders, i);
518,659✔
985
      QUERY_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
518,659✔
986
      stTimestampSorterReset(pReader);
518,659✔
987
    } else {
988
      void *px = taosArrayReserve(pReaders, 1);
31,866✔
989
      QUERY_CHECK_NULL(px, code, lino, _end, terrno);
31,866✔
990
      pReader = taosMemoryCalloc(1, sizeof(SSTriggerTimestampSorter));
31,866✔
991
      QUERY_CHECK_NULL(pReader, code, lino, _end, terrno);
31,866✔
992
      *(SSTriggerTimestampSorter **)px = pReader;
31,866✔
993
      code = stTimestampSorterInit(pReader, pTask);
31,866✔
994
      QUERY_CHECK_CODE(code, lino, _end);
31,866✔
995
    }
996
    code = stTimestampSorterSetSortInfo(pReader, pRange, pReaderInfo->pColRef->otbUid, 0);
550,525✔
997
    QUERY_CHECK_CODE(code, lino, _end);
550,525✔
998
  }
999

1000
  if (pMerger->pDataMerger && pMerger->pDataMerger->numOfSources < nTables) {
550,525✔
1001
    // destroy the old merger if it has less sources than needed
1002
    tMergeTreeDestroy(&pMerger->pDataMerger);
×
1003
  }
1004
  if (pMerger->pDataMerger == NULL) {
550,525✔
1005
    // round up to the nearest multiple of 8
1006
    int32_t capacity = (nTables + 7) / 8 * 8;
31,866✔
1007
    code = tMergeTreeCreate(&pMerger->pDataMerger, capacity, pReaderInfos, stVtableMergerReaderInfoCompare);
31,866✔
1008
    QUERY_CHECK_CODE(code, lino, _end);
31,866✔
1009
  } else {
1010
    code = tMergeTreeRebuild(pMerger->pDataMerger);
517,928✔
1011
    QUERY_CHECK_CODE(code, lino, _end);
518,659✔
1012
  }
1013

1014
_end:
518,659✔
1015
  if (TSDB_CODE_SUCCESS != code) {
550,525✔
1016
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1017
  } else {
1018
    BIT_FLAG_SET_MASK(pMerger->flags, TRIGGER_VTABLE_MERGER_MASK_MERGE_INFO_SET);
550,525✔
1019
  }
1020
  return code;
550,525✔
1021
}
1022

1023
int32_t stVtableMergerSetPseudoCols(SSTriggerVtableMerger *pMerger, SSDataBlock **ppDataBlock) {
549,794✔
1024
  int32_t             code = TSDB_CODE_SUCCESS;
549,794✔
1025
  int32_t             lino = 0;
549,794✔
1026
  SStreamTriggerTask *pTask = pMerger->pTask;
549,794✔
1027
  SArray             *pReaders = pMerger->pReaders;
549,794✔
1028

1029
  QUERY_CHECK_CONDITION(pMerger->pPseudoCols == NULL, code, lino, _end, TSDB_CODE_INVALID_PARA);
549,794✔
1030

1031
  pMerger->pPseudoCols = *ppDataBlock;
549,794✔
1032
  *ppDataBlock = NULL;
549,794✔
1033

1034
_end:
549,794✔
1035
  if (TSDB_CODE_SUCCESS != code) {
549,794✔
1036
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1037
  }
1038
  return code;
549,794✔
1039
}
1040

1041
int32_t stVtableMergerSetMetaDatas(SSTriggerVtableMerger *pMerger, SSHashObj *pOrigTableMetas) {
550,525✔
1042
  int32_t             code = TSDB_CODE_SUCCESS;
550,525✔
1043
  int32_t             lino = 0;
550,525✔
1044
  SStreamTriggerTask *pTask = pMerger->pTask;
550,525✔
1045
  SArray             *pReaders = pMerger->pReaders;
550,525✔
1046

1047
  QUERY_CHECK_CONDITION(pMerger->flags == TRIGGER_VTABLE_MERGER_MASK_MERGE_INFO_SET, code, lino, _end,
550,525✔
1048
                        TSDB_CODE_INVALID_PARA);
1049
  QUERY_CHECK_CONDITION(TARRAY_SIZE(pReaders) >= TARRAY_SIZE(pMerger->pReaderInfos), code, lino, _end,
550,525✔
1050
                        TSDB_CODE_INVALID_PARA);
1051

1052
  int32_t nReaders = TARRAY_SIZE(pMerger->pReaderInfos);
549,794✔
1053
  for (int32_t i = 0; i < nReaders; i++) {
1,101,050✔
1054
    SVtableMergerReaderInfo *pInfo = TARRAY_GET_ELEM(pMerger->pReaderInfos, i);
550,525✔
1055
    SSTriggerTableMeta      *pTableMeta = tSimpleHashGet(pOrigTableMetas, &pInfo->pColRef->otbUid, sizeof(int64_t));
549,794✔
1056
    if (pTableMeta != NULL) {
550,525✔
1057
      SSTriggerTimestampSorter *pReader = *(SSTriggerTimestampSorter **)TARRAY_GET_ELEM(pReaders, i);
550,525✔
1058
      code = stTimestampSorterSetMetaDatas(pReader, pTableMeta);
550,525✔
1059
      QUERY_CHECK_CODE(code, lino, _end);
550,525✔
1060
    }
1061
  }
1062

1063
  BIT_FLAG_SET_MASK(pMerger->flags, TRIGGER_VTABLE_MERGER_MASK_META_DATA_SET);
550,525✔
1064

1065
_end:
550,525✔
1066
  if (TSDB_CODE_SUCCESS != code) {
550,525✔
1067
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1068
  }
1069
  return code;
550,525✔
1070
}
1071

1072
int32_t stVtableMergerSetEmptyMetaDatas(SSTriggerVtableMerger *pMerger) {
×
1073
  int32_t             code = TSDB_CODE_SUCCESS;
×
1074
  int32_t             lino = 0;
×
1075
  SStreamTriggerTask *pTask = pMerger->pTask;
×
1076
  SArray             *pReaders = pMerger->pReaders;
×
1077

1078
  QUERY_CHECK_CONDITION(pMerger->flags == TRIGGER_VTABLE_MERGER_MASK_MERGE_INFO_SET, code, lino, _end,
×
1079
                        TSDB_CODE_INVALID_PARA);
1080
  QUERY_CHECK_CONDITION(TARRAY_SIZE(pReaders) >= TARRAY_SIZE(pMerger->pReaderInfos), code, lino, _end,
×
1081
                        TSDB_CODE_INVALID_PARA);
1082

1083
  int32_t nReaders = TARRAY_SIZE(pMerger->pReaderInfos);
×
1084
  for (int32_t i = 0; i < nReaders; i++) {
×
1085
    SSTriggerTimestampSorter *pReader = *(SSTriggerTimestampSorter **)TARRAY_GET_ELEM(pReaders, i);
×
1086
    code = stTimestampSorterSetEmptyMetaDatas(pReader);
×
1087
    QUERY_CHECK_CODE(code, lino, _end);
×
1088
  }
1089

1090
_end:
×
1091
  if (TSDB_CODE_SUCCESS != code) {
×
1092
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1093
  }
1094
  return code;
×
1095
}
1096

1097
#define stVtableMerger_NUM_OF_ROWS_PER_BLOCK 4096
1098

1099
static int32_t stVtableMergerCopyDataBlock(SSTriggerVtableMerger *pMerger, SVtableMergerReaderInfo *pReaderInfo,
476,694✔
1100
                                           int64_t endTime, bool *pIsFull) {
1101
  int32_t             code = TSDB_CODE_SUCCESS;
476,694✔
1102
  int32_t             lino = 0;
476,694✔
1103
  SStreamTriggerTask *pTask = pMerger->pTask;
476,694✔
1104
  SSDataBlock        *pVirDataBlock = pMerger->pDataBlock;
476,694✔
1105
  SSDataBlock        *pOrigDataBlock = pReaderInfo->pDataBlock;
476,694✔
1106

1107
  SColumnInfoData *pVirTsCol = taosArrayGet(pVirDataBlock->pDataBlock, 0);
476,694✔
1108
  QUERY_CHECK_NULL(pVirTsCol, code, lino, _end, terrno);
476,694✔
1109
  int64_t *pVirTsData = (int64_t *)pVirTsCol->pData;
476,694✔
1110

1111
  SColumnInfoData *pOrigTsCol = taosArrayGet(pOrigDataBlock->pDataBlock, 0);
476,694✔
1112
  QUERY_CHECK_NULL(pOrigTsCol, code, lino, _end, terrno);
476,694✔
1113
  int64_t *pOrigTsData = (int64_t *)pOrigTsCol->pData;
476,694✔
1114

1115
  int32_t virStartIdx = blockDataGetNumOfRows(pVirDataBlock);
476,694✔
1116
  if (virStartIdx > 0 && pVirTsData[virStartIdx - 1] == pOrigTsData[pReaderInfo->startIdx]) {
476,694✔
1117
    // merge to the last row
1118
    --virStartIdx;
×
1119
  }
1120

1121
  void   *px = taosbsearch(&endTime, pOrigTsData + pReaderInfo->startIdx, pReaderInfo->endIdx - pReaderInfo->startIdx,
476,694✔
1122
                           sizeof(int64_t), compareInt64Val, TD_GT);
1123
  int32_t origEndIdx = (px != NULL) ? (POINTER_DISTANCE(px, pOrigTsData) / sizeof(int64_t)) : pReaderInfo->endIdx;
476,694✔
1124

1125
  // copy data from original data block to virtual data block
1126
  int32_t nRowsToCopy = TMIN(pVirDataBlock->info.capacity - virStartIdx, origEndIdx - pReaderInfo->startIdx);
476,694✔
1127
  code = colDataAssignNRows(pVirTsCol, virStartIdx, pOrigTsCol, pReaderInfo->startIdx, nRowsToCopy);
476,694✔
1128
  QUERY_CHECK_CODE(code, lino, _end);
476,694✔
1129
  int32_t nCols = taosArrayGetSize(pReaderInfo->pColRef->pColMatches);
476,694✔
1130
  for (int32_t i = 0; i < nCols; i++) {
1,394,385✔
1131
    SSTriggerColMatch *pColMatch = TARRAY_GET_ELEM(pReaderInfo->pColRef->pColMatches, i);
917,691✔
1132
    SColumnInfoData   *pVirCol = taosArrayGet(pVirDataBlock->pDataBlock, pColMatch->vtbSlotId);
917,691✔
1133
    QUERY_CHECK_NULL(pVirCol, code, lino, _end, terrno);
917,691✔
1134
    SColumnInfoData *pOrigCol = taosArrayGet(pOrigDataBlock->pDataBlock, i + 1);
917,691✔
1135
    QUERY_CHECK_NULL(pOrigCol, code, lino, _end, terrno);
917,691✔
1136
    code = colDataAssignNRows(pVirCol, virStartIdx, pOrigCol, pReaderInfo->startIdx, nRowsToCopy);
917,691✔
1137
    QUERY_CHECK_CODE(code, lino, _end);
917,691✔
1138
  }
1139

1140
  // update block info
1141
  pVirDataBlock->info.rows = virStartIdx + nRowsToCopy;
476,694✔
1142
  pReaderInfo->startIdx += nRowsToCopy;
476,694✔
1143
  *pIsFull = (pReaderInfo->startIdx < origEndIdx);
476,694✔
1144

1145
_end:
476,694✔
1146
  if (TSDB_CODE_SUCCESS != code) {
476,694✔
1147
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1148
  }
1149
  return code;
476,694✔
1150
}
1151

1152
int32_t stVtableMergerNextDataBlock(SSTriggerVtableMerger *pMerger, SSDataBlock **ppDataBlock) {
1,577,013✔
1153
  int32_t                  code = TSDB_CODE_SUCCESS;
1,577,013✔
1154
  int32_t                  lino = 0;
1,577,013✔
1155
  SStreamTriggerTask      *pTask = pMerger->pTask;
1,577,013✔
1156
  SSDataBlock             *pDataBlock = pMerger->pDataBlock;
1,577,013✔
1157
  SVtableMergerReaderInfo *pReaderInfo = NULL;
1,576,807✔
1158
  SColumnInfoData         *p = NULL;
1,576,807✔
1159

1160
  *ppDataBlock = NULL;
1,576,807✔
1161

1162
  if (IS_TRIGGER_VTABLE_MERGER_EMPTY(pMerger)) {
1,576,807✔
1163
    goto _end;
×
1164
  } else {
1165
    int32_t nrows = blockDataGetNumOfRows(pDataBlock);
1,576,807✔
1166
    if (nrows > 0) {
1,577,013✔
1167
      SColumnInfoData *pVirTsCol = taosArrayGet(pDataBlock->pDataBlock, 0);
10,965✔
1168
      QUERY_CHECK_NULL(pVirTsCol, code, lino, _end, terrno);
10,965✔
1169
      int64_t *pVirTsData = (int64_t *)pVirTsCol->pData;
10,965✔
1170
      if (pMerger->readRange.skey > pVirTsData[nrows - 1]) {
10,965✔
1171
        // need to get next data block
1172
        blockDataReset(pDataBlock);
×
1173
        nrows = 0;
×
1174
      }
1175
    }
1176
    code = blockDataEnsureCapacity(pDataBlock, stVtableMerger_NUM_OF_ROWS_PER_BLOCK);
1,577,013✔
1177
    QUERY_CHECK_CODE(code, lino, _end);
1,576,807✔
1178
    if (nrows == 0) {
1,576,807✔
1179
      // set all columns to NULL by default
1180
      for (int32_t i = 0; i < TARRAY_SIZE(pDataBlock->pDataBlock); i++) {
8,843,954✔
1181
        SColumnInfoData *pCol = taosArrayGet(pDataBlock->pDataBlock, i);
7,277,381✔
1182
        QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
7,278,112✔
1183
        colDataSetNNULL(pCol, 0, stVtableMerger_NUM_OF_ROWS_PER_BLOCK);
1184
      }
1185
    }
1186
  }
1187

1188
  bool    needFetchDataBlock = false;
1,577,538✔
1189
  int32_t nCols = taosArrayGetSize(pDataBlock->pDataBlock);
1,577,538✔
1190
  while (!IS_TRIGGER_VTABLE_MERGER_EMPTY(pMerger)) {
3,449,813✔
1191
    int32_t idx = tMergeTreeGetChosenIndex(pMerger->pDataMerger);
2,902,212✔
1192
    pReaderInfo = TARRAY_GET_ELEM(pMerger->pReaderInfos, idx);
2,902,212✔
1193
    if (pReaderInfo->nextTs > pMerger->readRange.ekey) {
2,902,212✔
1194
      SET_TRIGGER_VTABLE_MERGER_EMPTY(pMerger);
547,601✔
1195
      continue;
547,601✔
1196
    }
1197

1198
    if (pMerger->nVirDataCols < nCols && pMerger->pPseudoCols == NULL) {
2,354,817✔
1199
      // need to fetch pseudo columns
1200
      needFetchDataBlock = true;
550,525✔
1201
      break;
550,525✔
1202
    }
1203

1204
    if (pReaderInfo->pDataBlock == NULL) {
1,804,292✔
1205
      // get next data block from reader
1206
      SSTriggerTimestampSorter *pReader = *(SSTriggerTimestampSorter **)TARRAY_GET_ELEM(pMerger->pReaders, idx);
1,327,598✔
1207
      code = stTimestampSorterNextDataBlock(pReader, &pReaderInfo->pDataBlock, &pReaderInfo->startIdx,
1,327,598✔
1208
                                            &pReaderInfo->endIdx);
1209
      QUERY_CHECK_CODE(code, lino, _end);
1,327,598✔
1210
      if (pReaderInfo->pDataBlock == NULL) {
1,327,598✔
1211
        if (IS_TRIGGER_TIMESTAMP_SORTER_EMPTY(pReader)) {
850,904✔
1212
          // no more data
1213
          pReaderInfo->nextTs = INT64_MAX;
372,017✔
1214
        } else {
1215
          // need to fetch data block
1216
          needFetchDataBlock = true;
478,887✔
1217
          break;
478,887✔
1218
        }
1219
      } else {
1220
        SColumnInfoData *pOrigTsCol = taosArrayGet(pReaderInfo->pDataBlock->pDataBlock, 0);
476,694✔
1221
        QUERY_CHECK_NULL(pOrigTsCol, code, lino, _end, terrno);
476,694✔
1222
        int64_t *pOrigTsData = (int64_t *)pOrigTsCol->pData;
476,694✔
1223
        pReaderInfo->nextTs = pOrigTsData[pReaderInfo->startIdx];
476,694✔
1224
      }
1225
      code = tMergeTreeAdjust(pMerger->pDataMerger, tMergeTreeGetAdjustIndex(pMerger->pDataMerger));
848,711✔
1226
      QUERY_CHECK_CODE(code, lino, _end);
848,711✔
1227
    } else {
1228
      int64_t endTime = pMerger->readRange.ekey;
476,694✔
1229
      if (TARRAY_SIZE(pMerger->pReaderInfos) > 1) {
476,694✔
1230
        int32_t idx2 = 0;
×
1231
        code = stMergeTreeGetSecondIndex(pMerger->pDataMerger, &idx2);
×
1232
        SVtableMergerReaderInfo *pReaderInfo2 = TARRAY_GET_ELEM(pMerger->pReaderInfos, idx2);
×
1233
        if (pReaderInfo->nextTs == pReaderInfo2->nextTs) {
×
1234
          endTime = TMIN(pReaderInfo->nextTs, endTime);
×
1235
        } else {
1236
          endTime = TMIN(pReaderInfo2->nextTs - 1, endTime);
×
1237
        }
1238
      }
1239
      bool isFull = false;
476,694✔
1240
      code = stVtableMergerCopyDataBlock(pMerger, pReaderInfo, endTime, &isFull);
476,694✔
1241
      QUERY_CHECK_CODE(code, lino, _end);
476,694✔
1242
      SColumnInfoData *pOrigTsCol = taosArrayGet(pReaderInfo->pDataBlock->pDataBlock, 0);
476,694✔
1243
      QUERY_CHECK_NULL(pOrigTsCol, code, lino, _end, terrno);
476,694✔
1244
      int64_t *pOrigTsData = (int64_t *)pOrigTsCol->pData;
476,694✔
1245
      if (pReaderInfo->startIdx < pReaderInfo->endIdx) {
476,694✔
1246
        pReaderInfo->nextTs = pOrigTsData[pReaderInfo->startIdx];
×
1247
      } else {
1248
        pReaderInfo->nextTs = pOrigTsData[pReaderInfo->endIdx - 1] + 1;
476,694✔
1249
        pReaderInfo->pDataBlock = NULL;
476,694✔
1250
        pReaderInfo->startIdx = pReaderInfo->endIdx = 0;
476,694✔
1251
      }
1252
      code = tMergeTreeAdjust(pMerger->pDataMerger, tMergeTreeGetAdjustIndex(pMerger->pDataMerger));
476,694✔
1253
      QUERY_CHECK_CODE(code, lino, _end);
476,694✔
1254
      if (isFull) {
476,694✔
1255
        // result data block is full, return it
1256
        break;
×
1257
      }
1258
    }
1259
  }
1260

1261
  int32_t nrows = blockDataGetNumOfRows(pDataBlock);
1,577,013✔
1262
  if (!needFetchDataBlock && nrows > 0) {
1,577,013✔
1263
    if (pMerger->nVirDataCols < nCols) {
465,729✔
1264
      QUERY_CHECK_NULL(pMerger->pPseudoCols, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
465,729✔
1265
      int32_t nPseudoCols = blockDataGetNumOfCols(pMerger->pPseudoCols);
465,729✔
1266
      QUERY_CHECK_CONDITION(pMerger->nVirDataCols + nPseudoCols == nCols, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
465,729✔
1267
      for (int32_t i = 0; i < nPseudoCols; i++) {
1,276,595✔
1268
        SColumnInfoData *pSrc = TARRAY_GET_ELEM(pMerger->pPseudoCols->pDataBlock, i);
810,866✔
1269
        SColumnInfoData *pDst = TARRAY_GET_ELEM(pMerger->pDataBlock->pDataBlock, pMerger->nVirDataCols + i);
810,866✔
1270
        if (!colDataIsNull_s(pSrc, 0)) {
810,866✔
1271
          if (!IS_VAR_DATA_TYPE(pDst->info.type) && pDst->nullbitmap != NULL) {
810,866✔
1272
            int32_t bmLen = BitmapLen(nrows);
25,854✔
1273
            memset(pDst->nullbitmap, 0, bmLen);
25,854✔
1274
          }
1275
          code = colDataCopyNItems(pDst, 0, colDataGetData(pSrc, 0), nrows, false);
810,866✔
1276
          QUERY_CHECK_CODE(code, lino, _end);
810,866✔
1277
        }
1278
      }
1279
    }
1280
    if (pMerger->pFilter != NULL) {
465,729✔
1281
      int32_t status = 0;
2,924✔
1282
      code = filterExecute(pMerger->pFilter, pMerger->pDataBlock, &p, NULL, blockDataGetNumOfCols(pDataBlock), &status);
2,924✔
1283
      QUERY_CHECK_CODE(code, lino, _end);
2,924✔
1284
      code = trimDataBlock(pMerger->pDataBlock, nrows, (bool *)p->pData);
2,924✔
1285
      QUERY_CHECK_CODE(code, lino, _end);
2,924✔
1286
    }
1287
    *ppDataBlock = pMerger->pDataBlock;
465,729✔
1288
    SColumnInfoData *pVirTsCol = taosArrayGet(pDataBlock->pDataBlock, 0);
465,729✔
1289
    QUERY_CHECK_NULL(pVirTsCol, code, lino, _end, terrno);
465,729✔
1290
    int64_t *pVirTsData = (int64_t *)pVirTsCol->pData;
465,729✔
1291
    pMerger->readRange.skey = TMAX(pMerger->readRange.skey, pVirTsData[nrows - 1] + 1);
465,729✔
1292
  }
1293

1294
_end:
1,577,013✔
1295
  if (p != NULL) {
1,577,013✔
1296
    colDataDestroy(p);
2,924✔
1297
    taosMemoryFreeClear(p);
2,924✔
1298
  }
1299
  if (TSDB_CODE_SUCCESS != code) {
1,577,013✔
1300
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1301
  }
1302
  return code;
1,577,013✔
1303
}
1304

1305
int32_t stVtableMergerGetMetaToFetch(SSTriggerVtableMerger *pMerger, SSTriggerMetaData **ppMeta,
1,029,412✔
1306
                                     SSTriggerTableColRef **ppColRef) {
1307
  int32_t             code = TSDB_CODE_SUCCESS;
1,029,412✔
1308
  int32_t             lino = 0;
1,029,412✔
1309
  SStreamTriggerTask *pTask = pMerger->pTask;
1,029,412✔
1310

1311
  *ppMeta = NULL;
1,029,412✔
1312
  *ppColRef = NULL;
1,029,412✔
1313

1314
  if (IS_TRIGGER_VTABLE_MERGER_EMPTY(pMerger)) {
1,029,412✔
1315
    goto _end;
×
1316
  }
1317

1318
  QUERY_CHECK_CONDITION(BIT_FLAG_TEST_MASK(pMerger->flags, TRIGGER_VTABLE_MERGER_MASK_META_DATA_SET), code, lino, _end,
1,029,412✔
1319
                        TSDB_CODE_INVALID_PARA);
1320
  int32_t                   idx = tMergeTreeGetChosenIndex(pMerger->pDataMerger);
1,029,412✔
1321
  SVtableMergerReaderInfo  *pReaderInfo = TARRAY_GET_ELEM(pMerger->pReaderInfos, idx);
1,029,412✔
1322
  SSTriggerTimestampSorter *pReader = *(SSTriggerTimestampSorter **)TARRAY_GET_ELEM(pMerger->pReaders, idx);
1,029,412✔
1323
  if (pMerger->nVirDataCols < blockDataGetNumOfCols(pMerger->pDataBlock) && pMerger->pPseudoCols == NULL) {
1,029,412✔
1324
    *ppColRef = pReaderInfo->pColRef;
550,525✔
1325
  } else if (pReaderInfo->pDataBlock == NULL) {
478,887✔
1326
    code = stTimestampSorterGetMetaToFetch(pReader, ppMeta);
478,887✔
1327
    QUERY_CHECK_CODE(code, lino, _end);
478,887✔
1328
    if (*ppMeta != NULL) {
478,887✔
1329
      *ppColRef = pReaderInfo->pColRef;
478,887✔
1330
    }
1331
  }
1332

1333
_end:
×
1334
  if (TSDB_CODE_SUCCESS != code) {
1,029,412✔
1335
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1336
  }
1337
  return code;
1,029,412✔
1338
}
1339

1340
int32_t stVtableMergerBindDataBlock(SSTriggerVtableMerger *pMerger, SSDataBlock **ppDataBlock) {
476,694✔
1341
  int32_t             code = TSDB_CODE_SUCCESS;
476,694✔
1342
  int32_t             lino = 0;
476,694✔
1343
  SStreamTriggerTask *pTask = pMerger->pTask;
476,694✔
1344

1345
  QUERY_CHECK_CONDITION(!IS_TRIGGER_VTABLE_MERGER_EMPTY(pMerger), code, lino, _end, TSDB_CODE_INVALID_PARA);
476,694✔
1346
  QUERY_CHECK_CONDITION(BIT_FLAG_TEST_MASK(pMerger->flags, TRIGGER_VTABLE_MERGER_MASK_META_DATA_SET), code, lino, _end,
476,694✔
1347
                        TSDB_CODE_INVALID_PARA);
1348

1349
  int32_t                   idx = tMergeTreeGetChosenIndex(pMerger->pDataMerger);
476,694✔
1350
  SVtableMergerReaderInfo  *pReaderInfo = TARRAY_GET_ELEM(pMerger->pReaderInfos, idx);
476,694✔
1351
  SSTriggerTimestampSorter *pReader = *(SSTriggerTimestampSorter **)TARRAY_GET_ELEM(pMerger->pReaders, idx);
476,694✔
1352
  QUERY_CHECK_CONDITION(pReaderInfo->pDataBlock == NULL, code, lino, _end, TSDB_CODE_INVALID_PARA);
476,694✔
1353
  code = stTimestampSorterBindDataBlock(pReader, ppDataBlock);
476,694✔
1354
  QUERY_CHECK_CODE(code, lino, _end);
476,694✔
1355

1356
_end:
476,694✔
1357
  if (TSDB_CODE_SUCCESS != code) {
476,694✔
1358
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1359
  }
1360
  return code;
476,694✔
1361
}
1362

1363
typedef struct SNewTimestampSorterSlice {
1364
  int32_t startIdx;
1365
  int32_t endIdx;
1366
  TD_DLIST_NODE(SNewTimestampSorterSlice);
1367
} SNewTimestampSorterSlice;
1368

1369
typedef TD_DLIST(SNewTimestampSorterSlice) SNewTimestampSorterSliceList;
1370

1371
static int32_t stNewTimestampSorterSliceListCompare(const void *pLeft, const void *pRight, void *param) {
12,682,670✔
1372
  int32_t                      left = *(const int32_t *)pLeft;
12,682,670✔
1373
  int32_t                      right = *(const int32_t *)pRight;
12,682,670✔
1374
  SSTriggerNewTimestampSorter *pSorter = (SSTriggerNewTimestampSorter *)param;
12,682,876✔
1375
  SArray                      *pSliceLists = pSorter->pSliceLists;
12,682,876✔
1376

1377
  if (left < TARRAY_SIZE(pSorter->pSliceLists) && right < TARRAY_SIZE(pSorter->pSliceLists)) {
12,683,080✔
1378
    SNewTimestampSorterSliceList *pLeftList = TARRAY_GET_ELEM(pSorter->pSliceLists, left);
267,139✔
1379
    SNewTimestampSorterSliceList *pRightList = TARRAY_GET_ELEM(pSorter->pSliceLists, right);
267,139✔
1380

1381
    SColumnInfoData *pTsCol = TARRAY_GET_ELEM(pSorter->pDataBlock->pDataBlock, pSorter->tsSlotId);
267,139✔
1382
    int64_t         *pTsData = (int64_t *)pTsCol->pData;
267,139✔
1383
    int32_t          leftIdx = (TD_DLIST_HEAD(pLeftList) != NULL) ? TD_DLIST_HEAD(pLeftList)->startIdx : -1;
267,139✔
1384
    int64_t          leftTs = (leftIdx >= 0) ? pTsData[leftIdx] : INT64_MAX;
267,139✔
1385
    int32_t          rightIdx = (TD_DLIST_HEAD(pRightList) != NULL) ? TD_DLIST_HEAD(pRightList)->startIdx : -1;
267,139✔
1386
    int64_t          rightTs = (rightIdx >= 0) ? pTsData[rightIdx] : INT64_MAX;
267,139✔
1387

1388
    // compare by start timestamp first, then by start index
1389
    if (leftTs < rightTs) {
267,139✔
1390
      return -1;
104,531✔
1391
    } else if (leftTs > rightTs) {
162,608✔
1392
      return 1;
70,422✔
1393
    } else if (leftIdx < rightIdx) {
92,186✔
1394
      return 1;
5,006✔
1395
    } else if (leftIdx > rightIdx) {
87,180✔
1396
      return -1;
28,102✔
1397
    }
1398
  }
1399
_end:
12,474,813✔
1400
  // fallback to index comparison
1401
  if (left < right) {
12,474,813✔
1402
    return -1;
4,669✔
1403
  } else if (left > right) {
12,470,144✔
1404
    return 1;
12,470,144✔
1405
  }
1406
  return 0;
×
1407
}
1408

1409
int32_t stNewTimestampSorterInit(SSTriggerNewTimestampSorter *pSorter, struct SStreamTriggerTask *pTask,
191,155✔
1410
                                 int32_t verColBias) {
1411
  int32_t code = TSDB_CODE_SUCCESS;
191,155✔
1412
  int32_t lino = 0;
191,155✔
1413

1414
  pSorter->pTask = pTask;
191,155✔
1415
  pSorter->verColBias = verColBias;
191,155✔
1416

1417
  pSorter->pSliceBuf = taosArrayInit(0, sizeof(SNewTimestampSorterSlice));
191,155✔
1418
  QUERY_CHECK_NULL(pSorter->pSliceBuf, code, lino, _end, terrno);
191,155✔
1419
  pSorter->pSliceLists = taosArrayInit(0, sizeof(SNewTimestampSorterSliceList));
191,155✔
1420
  QUERY_CHECK_NULL(pSorter->pSliceLists, code, lino, _end, terrno);
191,155✔
1421

1422
_end:
191,155✔
1423
  if (TSDB_CODE_SUCCESS != code) {
191,155✔
1424
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1425
  }
1426
  return code;
191,155✔
1427
}
1428

1429
void stNewTimestampSorterDestroy(void *ptr) {
191,155✔
1430
  SSTriggerNewTimestampSorter **ppSorter = ptr;
191,155✔
1431
  if (ppSorter == NULL || *ppSorter == NULL) {
191,155✔
1432
    return;
×
1433
  }
1434

1435
  SSTriggerNewTimestampSorter *pSorter = *ppSorter;
191,155✔
1436
  if (pSorter->pSliceBuf != NULL) {
191,155✔
1437
    taosArrayDestroy(pSorter->pSliceBuf);
191,155✔
1438
    pSorter->pSliceBuf = NULL;
191,155✔
1439
  }
1440
  if (pSorter->pSliceLists != NULL) {
191,155✔
1441
    taosArrayDestroy(pSorter->pSliceLists);
191,155✔
1442
    pSorter->pSliceLists = NULL;
191,155✔
1443
  }
1444
  if (pSorter->pDataMerger != NULL) {
191,155✔
1445
    tMergeTreeDestroy(&pSorter->pDataMerger);
155,181✔
1446
  }
1447

1448
  taosMemoryFreeClear(*ppSorter);
191,155✔
1449
}
1450

1451
void stNewTimestampSorterReset(SSTriggerNewTimestampSorter *pSorter) {
3,474,193✔
1452
  if (pSorter == NULL) {
3,474,193✔
1453
    return;
×
1454
  }
1455

1456
  pSorter->inUse = false;
3,474,193✔
1457
  pSorter->pDataBlock = NULL;
3,474,193✔
1458

1459
  if (pSorter->pSliceBuf != NULL) {
3,474,193✔
1460
    taosArrayClear(pSorter->pSliceBuf);
3,474,193✔
1461
  }
1462
  if (pSorter->pSliceLists != NULL) {
3,473,462✔
1463
    taosArrayClear(pSorter->pSliceLists);
3,473,462✔
1464
  }
1465
}
1466

1467
int32_t stNewTimestampSorterSetData(SSTriggerNewTimestampSorter *pSorter, int64_t tbUid, int32_t tsSlotId,
1,751,675✔
1468
                                    STimeWindow *pReadRange, SObjList *pMetas, SSTriggerDataSlice *pSlice) {
1469
  int32_t             code = TSDB_CODE_SUCCESS;
1,751,675✔
1470
  int32_t             lino = 0;
1,751,675✔
1471
  SStreamTriggerTask *pTask = pSorter->pTask;
1,751,675✔
1472
  SSDataBlock        *pDataBlock = pSlice->pDataBlock;
1,751,471✔
1473

1474
  QUERY_CHECK_CONDITION(!pSorter->inUse, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,751,471✔
1475

1476
  pSorter->inUse = true;
1,751,675✔
1477
  pSorter->pDataBlock = pDataBlock;
1,751,437✔
1478
  pSorter->tsSlotId = tsSlotId;
1,751,437✔
1479
  pDataBlock->info.id.uid = tbUid;
1,751,675✔
1480

1481
  // collect all data slices; data in each slice is in ascending order
1482
  SColumnInfoData *pTsCol = taosArrayGet(pDataBlock->pDataBlock, tsSlotId);
1,751,675✔
1483
  QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
1,751,437✔
1484
  int64_t         *pTsData = (int64_t *)pTsCol->pData;
1,751,437✔
1485
  SColumnInfoData *pVerCol = taosArrayGetLast(pDataBlock->pDataBlock);
1,751,675✔
1486
  QUERY_CHECK_NULL(pVerCol, code, lino, _end, terrno);
1,751,437✔
1487
  pVerCol -= pSorter->verColBias;
1,751,437✔
1488
  int64_t *pVerData = (int64_t *)pVerCol->pData;
1,751,437✔
1489

1490
  int32_t            i = pSlice->startIdx;
1,751,437✔
1491
  int64_t            lastTs = INT64_MIN;
1,751,675✔
1492
  SSTriggerMetaData *pMeta = NULL;
1,751,675✔
1493
  SObjListIter       iter;
1,751,092✔
1494
  taosObjListInitIter(pMetas, &iter, TOBJLIST_ITER_FORWARD);
1,751,675✔
1495
  while ((i < pSlice->endIdx) && (pMeta = (SSTriggerMetaData *)taosObjListIterNext(&iter)) != NULL) {
81,593,775✔
1496
    while (i < pSlice->endIdx && pVerData[i] < pMeta->ver) {
267,421,575✔
1497
      lastTs = TMAX(lastTs, pTsData[i]);
187,579,473✔
1498
      i++;
187,579,709✔
1499
    }
1500
    if (i < pSlice->endIdx && pVerData[i] > pMeta->ver) {
79,841,126✔
1501
      continue;
11,555,384✔
1502
    }
1503
    while (pTask->ignoreDisorder && i < pSlice->endIdx && lastTs != INT64_MIN &&
68,326,739✔
1504
           pTsData[i] <= lastTs - pTask->watermark) {
1,716,027✔
1505
      i++;
40,053✔
1506
    }
1507
    int64_t skey = TMAX(pMeta->skey, pReadRange->skey);
68,286,686✔
1508
    while (i < pSlice->endIdx && pVerData[i] == pMeta->ver && pTsData[i] < skey) {
343,787,561✔
1509
      i++;
275,500,877✔
1510
    }
1511
    SNewTimestampSorterSlice slice = {.startIdx = i};
68,286,920✔
1512
    int64_t                  ekey = TMIN(pMeta->ekey, pReadRange->ekey);
68,286,952✔
1513
    while (i < pSlice->endIdx && pVerData[i] == pMeta->ver && pTsData[i] <= ekey) {
77,949,502✔
1514
      i++;
9,661,902✔
1515
    }
1516
    slice.endIdx = i;
68,287,628✔
1517
    if (slice.startIdx < slice.endIdx) {
68,287,628✔
1518
      SNewTimestampSorterSlice *pLastSlice = NULL;
8,766,584✔
1519
      if (TARRAY_SIZE(pSorter->pSliceBuf) > 0) {
8,766,584✔
1520
        pLastSlice = TARRAY_GET_ELEM(pSorter->pSliceBuf, TARRAY_SIZE(pSorter->pSliceBuf) - 1);
7,040,984✔
1521
      }
1522
      if (pLastSlice != NULL && pLastSlice->endIdx == slice.startIdx &&
8,766,616✔
1523
          pTsData[pLastSlice->endIdx - 1] < pTsData[slice.startIdx]) {
7,021,307✔
1524
        // merge with the last slice
1525
        pLastSlice->endIdx = slice.endIdx;
6,973,253✔
1526
      } else {
1527
        void *px = taosArrayPush(pSorter->pSliceBuf, &slice);
1,793,157✔
1528
        QUERY_CHECK_NULL(px, code, lino, _end, terrno);
1,793,159✔
1529
      }
1530
    }
1531
    if (i > 0) {
68,287,662✔
1532
      lastTs = TMAX(lastTs, pTsData[i - 1]);
68,284,441✔
1533
    }
1534
  }
1535

1536
  // combine slices into lists; data in each list is in ascending order
1537
  for (int32_t i = 0; i < TARRAY_SIZE(pSorter->pSliceBuf); i++) {
3,544,766✔
1538
    SNewTimestampSorterSlice     *pSlice = TARRAY_GET_ELEM(pSorter->pSliceBuf, i);
1,793,567✔
1539
    SNewTimestampSorterSliceList *pList = NULL;
1,793,567✔
1540
    int64_t                       firstTs = pTsData[pSlice->startIdx];
1,793,567✔
1541
    for (int32_t j = 0; j < TARRAY_SIZE(pSorter->pSliceLists); j++) {
1,876,236✔
1542
      SNewTimestampSorterSliceList *pTempList = TARRAY_GET_ELEM(pSorter->pSliceLists, j);
91,116✔
1543
      if (pTsData[TD_DLIST_TAIL(pTempList)->endIdx - 1] < firstTs) {
91,116✔
1544
        pList = pTempList;
8,447✔
1545
        break;
8,447✔
1546
      }
1547
    }
1548
    if (pList == NULL) {
1,793,567✔
1549
      pList = taosArrayReserve(pSorter->pSliceLists, 1);
1,785,120✔
1550
      QUERY_CHECK_NULL(pList, code, lino, _end, terrno);
1,784,916✔
1551
    }
1552
    TD_DLIST_APPEND(pList, pSlice);
1,793,363✔
1553
  }
1554

1555
  if (stDebugFlag & DEBUG_DEBUG) {
1,751,437✔
1556
    for (int32_t i = 0; i < TARRAY_SIZE(pSorter->pSliceLists); i++) {
3,091,503✔
1557
      SNewTimestampSorterSliceList *pList = TARRAY_GET_ELEM(pSorter->pSliceLists, i);
1,559,996✔
1558
      SNewTimestampSorterSlice     *pSlice = TD_DLIST_HEAD(pList);
1,559,996✔
1559
      while (pSlice != NULL) {
3,128,439✔
1560
        ST_TASK_DLOG("Slice List %d: add [%d, %d)", i, pSlice->startIdx, pSlice->endIdx);
1,568,443✔
1561
        pSlice = TD_DLIST_NODE_NEXT(pSlice);
1,568,443✔
1562
      }
1563
    }
1564
  }
1565

1566
  if (TARRAY_SIZE(pSorter->pSliceLists) == 0) {
1,751,437✔
1567
    goto _end;
25,633✔
1568
  }
1569

1570
  // merge data from all lists
1571
  if (pSorter->pDataMerger != NULL && pSorter->pDataMerger->numOfSources < TARRAY_SIZE(pSorter->pSliceLists)) {
1,725,838✔
1572
    tMergeTreeDestroy(&pSorter->pDataMerger);
203✔
1573
  }
1574
  if (pSorter->pDataMerger == NULL) {
1,726,042✔
1575
    // round up to the nearest multiple of 8
1576
    int32_t capacity = (TARRAY_SIZE(pSorter->pSliceLists) + 7) / 8 * 8;
155,384✔
1577
    code = tMergeTreeCreate(&pSorter->pDataMerger, capacity, pSorter, stNewTimestampSorterSliceListCompare);
155,384✔
1578
    QUERY_CHECK_CODE(code, lino, _end);
155,178✔
1579
  } else {
1580
    code = tMergeTreeRebuild(pSorter->pDataMerger);
1,570,454✔
1581
    QUERY_CHECK_CODE(code, lino, _end);
1,570,658✔
1582
  }
1583

1584
_end:
1,751,193✔
1585
  if (TSDB_CODE_SUCCESS != code) {
1,751,675✔
1586
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1587
  }
1588
  return code;
1,751,675✔
1589
}
1590

1591
int32_t stNewTimestampSorterNextDataBlock(SSTriggerNewTimestampSorter *pSorter, SSDataBlock **ppDataBlock,
3,590,599✔
1592
                                          int32_t *pStartIdx, int32_t *pEndIdx) {
1593
  int32_t             code = TSDB_CODE_SUCCESS;
3,590,599✔
1594
  int32_t             lino = 0;
3,590,599✔
1595
  SStreamTriggerTask *pTask = pSorter->pTask;
3,590,599✔
1596
  bool                needRebuild = false;
3,590,599✔
1597

1598
  QUERY_CHECK_CONDITION(pSorter->inUse, code, lino, _end, TSDB_CODE_INVALID_PARA);
3,590,599✔
1599

1600
  if (ppDataBlock != NULL) {
3,590,187✔
1601
    *ppDataBlock = NULL;
1,402,536✔
1602
  }
1603
  *pStartIdx = 0;
3,590,187✔
1604
  *pEndIdx = 0;
3,590,393✔
1605

1606
  if (TARRAY_SIZE(pSorter->pSliceLists) == 0) {
3,590,599✔
1607
    goto _end;
25,633✔
1608
  }
1609

1610
  int32_t                       idx = tMergeTreeGetChosenIndex(pSorter->pDataMerger);
3,564,554✔
1611
  SNewTimestampSorterSliceList *pList = TARRAY_GET_ELEM(pSorter->pSliceLists, idx);
3,564,966✔
1612
  SNewTimestampSorterSlice     *pSlice = TD_DLIST_HEAD(pList);
3,564,760✔
1613
  if (pSlice == NULL) {
3,564,554✔
1614
    goto _end;
1,726,042✔
1615
  }
1616

1617
  if (ppDataBlock != NULL) {
1,838,512✔
1618
    *ppDataBlock = pSorter->pDataBlock;
708,778✔
1619
  }
1620
  *pStartIdx = pSlice->startIdx;
1,838,718✔
1621

1622
  if (TARRAY_SIZE(pSorter->pSliceLists) == 1) {
1,838,718✔
1623
    *pEndIdx = pSlice->endIdx;
1,686,050✔
1624
    TD_DLIST_POP(pList, pSlice);
1,686,050✔
1625
    goto _end;
1,686,050✔
1626
  }
1627

1628
  SColumnInfoData *pTsCol = TARRAY_GET_ELEM(pSorter->pDataBlock->pDataBlock, pSorter->tsSlotId);
152,874✔
1629
  int64_t         *pTsData = (int64_t *)pTsCol->pData;
152,874✔
1630
  int64_t          startTs = pTsData[pSlice->startIdx];
152,874✔
1631
  int64_t          endTs = INT64_MAX;
152,874✔
1632
  for (int32_t i = 0; i < TARRAY_SIZE(pSorter->pSliceLists); i++) {
536,342✔
1633
    SNewTimestampSorterSliceList *pTempList = TARRAY_GET_ELEM(pSorter->pSliceLists, i);
383,468✔
1634
    SNewTimestampSorterSlice     *pTempSlice = TD_DLIST_HEAD(pTempList);
383,468✔
1635
    if (i == idx || pTempSlice == NULL) {
383,468✔
1636
      continue;
226,192✔
1637
    }
1638
    if (pTsData[pTempSlice->startIdx] == startTs) {
157,276✔
1639
      // skip the current row
1640
      ST_TASK_DLOG("Slice List %d: pop [%d, %d)", i, pTempSlice->startIdx, pTempSlice->startIdx + 1);
33,108✔
1641
      needRebuild = true;
33,108✔
1642
      pTempSlice->startIdx++;
33,108✔
1643
      if (pTempSlice->startIdx == pTempSlice->endIdx) {
33,108✔
1644
        TD_DLIST_POP(pTempList, pTempSlice);
9,351✔
1645
        pTempSlice = TD_DLIST_HEAD(pTempList);
9,351✔
1646
        if (pTempSlice == NULL) {
9,351✔
1647
          continue;
9,351✔
1648
        }
1649
      }
1650
    }
1651
    endTs = TMIN(endTs, pTsData[pTempSlice->startIdx]);
147,925✔
1652
  }
1653
  QUERY_CHECK_CONDITION(endTs > startTs, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
152,874✔
1654

1655
  if (pTsData[pSlice->endIdx - 1] < endTs) {
152,874✔
1656
    *pEndIdx = pSlice->endIdx;
98,166✔
1657
    TD_DLIST_POP(pList, pSlice);
98,166✔
1658
    pSlice = TD_DLIST_HEAD(pList);
98,166✔
1659
  } else {
1660
    void *px = taosbsearch(&endTs, pTsData + pSlice->startIdx, pSlice->endIdx - pSlice->startIdx, sizeof(int64_t),
54,708✔
1661
                           compareInt64Val, TD_GE);
1662
    QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
54,708✔
1663
    *pEndIdx = POINTER_DISTANCE(px, pTsData) / sizeof(int64_t);
54,708✔
1664
    pSlice->startIdx = *pEndIdx;
54,708✔
1665
  }
1666

1667
  ST_TASK_DLOG("Slice List %d: pop [%d, %d)", idx, *pStartIdx, *pEndIdx);
152,874✔
1668

1669
  if (needRebuild) {
152,874✔
1670
    code = tMergeTreeRebuild(pSorter->pDataMerger);
33,108✔
1671
    QUERY_CHECK_CODE(code, lino, _end);
33,108✔
1672
  } else if (pSlice == NULL || pTsData[pSlice->startIdx] >= endTs) {
119,766✔
1673
    code = tMergeTreeAdjust(pSorter->pDataMerger, tMergeTreeGetAdjustIndex(pSorter->pDataMerger));
119,766✔
1674
    QUERY_CHECK_CODE(code, lino, _end);
119,766✔
1675
  }
1676

1677
_end:
3,589,433✔
1678
  if (TSDB_CODE_SUCCESS != code) {
3,590,599✔
1679
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1680
  }
1681
  return code;
3,590,599✔
1682
}
1683

1684
typedef struct SNewVtableMergerReaderInfo {
1685
  SSTriggerNewTimestampSorter *pReader;
1686
  SSTriggerTableColRef        *pColRef;
1687
  int32_t                      startIdx;
1688
  int32_t                      endIdx;
1689
} SNewVtableMergerReaderInfo;
1690

1691
static int32_t stNewVtableMergerReaderInfoCompare(const void *pLeft, const void *pRight, void *param) {
7,402,405✔
1692
  int32_t left = *(const int32_t *)pLeft;
7,402,405✔
1693
  int32_t right = *(const int32_t *)pRight;
7,402,405✔
1694
  SArray *pReaderInfos = (SArray *)param;
7,402,405✔
1695

1696
  if (left < TARRAY_SIZE(pReaderInfos) && right < TARRAY_SIZE(pReaderInfos)) {
7,402,405✔
1697
    SNewVtableMergerReaderInfo *pLeftReaderInfo = TARRAY_GET_ELEM(pReaderInfos, left);
4,104✔
1698
    SNewVtableMergerReaderInfo *pRightReaderInfo = TARRAY_GET_ELEM(pReaderInfos, right);
4,104✔
1699

1700
    int64_t leftTs = INT64_MAX;
4,104✔
1701
    if (pLeftReaderInfo->startIdx < pLeftReaderInfo->endIdx) {
4,104✔
1702
      SColumnInfoData *pTsCol =
3,024✔
1703
          TARRAY_GET_ELEM(pLeftReaderInfo->pReader->pDataBlock->pDataBlock, pLeftReaderInfo->pReader->tsSlotId);
1,512✔
1704
      int64_t *pTsData = (int64_t *)pTsCol->pData;
1,512✔
1705
      leftTs = pTsData[pLeftReaderInfo->startIdx];
1,512✔
1706
    }
1707
    int64_t rightTs = INT64_MAX;
4,104✔
1708
    if (pRightReaderInfo->startIdx < pRightReaderInfo->endIdx) {
4,104✔
1709
      SColumnInfoData *pTsCol =
3,456✔
1710
          TARRAY_GET_ELEM(pRightReaderInfo->pReader->pDataBlock->pDataBlock, pRightReaderInfo->pReader->tsSlotId);
1,728✔
1711
      int64_t *pTsData = (int64_t *)pTsCol->pData;
1,728✔
1712
      rightTs = pTsData[pRightReaderInfo->startIdx];
1,728✔
1713
    }
1714

1715
    if (leftTs < rightTs) {
4,104✔
1716
      return -1;
864✔
1717
    } else if (leftTs > rightTs) {
3,240✔
1718
      return 1;
1,728✔
1719
    }
1720
  }
1721
  // fallback to index comparison
1722
  if (left < right) {
7,399,813✔
1723
    return -1;
864✔
1724
  } else if (left > right) {
7,398,949✔
1725
    return 1;
7,398,949✔
1726
  }
1727
  return 0;
×
1728
}
1729

1730
#define VTABLE_MERGER_NROWS_PER_BLOCK 4096
1731

1732
static void stNewVtableMergerDestoryReaderInfo(void *ptr) {
75,862✔
1733
  SNewVtableMergerReaderInfo *pInfo = ptr;
75,862✔
1734
  if (pInfo && pInfo->pReader != NULL) {
75,862✔
1735
    stNewTimestampSorterDestroy(&pInfo->pReader);
75,862✔
1736
  }
1737
}
75,862✔
1738

1739
int32_t stNewVtableMergerInit(SSTriggerNewVtableMerger *pMerger, struct SStreamTriggerTask *pTask,
92,780✔
1740
                              SSDataBlock *pDataBlock, bool *pIsPseudoCol, SNode *pFilter) {
1741
  int32_t code = TSDB_CODE_SUCCESS;
92,780✔
1742
  int32_t lino = 0;
92,780✔
1743

1744
  pMerger->pTask = pTask;
92,780✔
1745
  code = createOneDataBlock(pDataBlock, false, &pMerger->pDataBlock);
92,780✔
1746
  QUERY_CHECK_CODE(code, lino, _end);
92,780✔
1747
  bool hasPseudo = false;
92,780✔
1748
  for (int32_t i = 0; i < TARRAY_SIZE(pMerger->pDataBlock->pDataBlock); i++) {
310,729✔
1749
    if (pIsPseudoCol[i]) {
278,962✔
1750
      hasPseudo = true;
61,013✔
1751
      break;
61,013✔
1752
    }
1753
  }
1754
  pMerger->pIsPseudoCol = hasPseudo ? pIsPseudoCol : NULL;
92,780✔
1755
  code = filterInitFromNode(pFilter, &pMerger->pFilter, 0, NULL);
92,780✔
1756
  QUERY_CHECK_CODE(code, lino, _end);
92,780✔
1757

1758
  code = blockDataEnsureCapacity(pMerger->pDataBlock, VTABLE_MERGER_NROWS_PER_BLOCK);
92,780✔
1759
  QUERY_CHECK_CODE(code, lino, _end);
92,780✔
1760

1761
  if (pMerger->pIsPseudoCol != NULL) {
92,780✔
1762
    pMerger->pPseudoColValues = taosMemoryCalloc(1, sizeof(SSDataBlock));
61,013✔
1763
    QUERY_CHECK_NULL(pMerger->pPseudoColValues, code, lino, _end, terrno);
61,013✔
1764
  }
1765

1766
  pMerger->pReaderInfos = taosArrayInit(0, sizeof(SNewVtableMergerReaderInfo));
92,780✔
1767
  QUERY_CHECK_NULL(pMerger->pReaderInfos, code, lino, _end, terrno);
92,780✔
1768

1769
  if (pMerger->pFilter != NULL) {
92,780✔
1770
    SFilterColumnParam param = {.numOfCols = taosArrayGetSize(pMerger->pDataBlock->pDataBlock),
7,310✔
1771
                                .pDataBlock = pMerger->pDataBlock->pDataBlock};
7,310✔
1772
    code = filterSetDataFromSlotId(pMerger->pFilter, &param);
7,310✔
1773
    QUERY_CHECK_CODE(code, lino, _end);
7,310✔
1774
  }
1775

1776
_end:
85,470✔
1777
  if (TSDB_CODE_SUCCESS != code) {
92,780✔
1778
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1779
  }
1780
  return code;
92,780✔
1781
}
1782

1783
bool stNewVtableMergerNeedPseudoCols(SSTriggerNewVtableMerger *pMerger) {
2,476,612✔
1784
  return (pMerger->pIsPseudoCol != NULL) && (blockDataGetNumOfRows(pMerger->pPseudoColValues) == 0);
2,476,612✔
1785
}
1786

1787
void stNewVtableMergerDestroy(void *ptr) {
92,780✔
1788
  SSTriggerNewVtableMerger **ppMerger = ptr;
92,780✔
1789
  if (ppMerger == NULL || *ppMerger == NULL) {
92,780✔
1790
    return;
×
1791
  }
1792

1793
  SSTriggerNewVtableMerger *pMerger = *ppMerger;
92,780✔
1794
  if (pMerger->pDataBlock != NULL) {
92,780✔
1795
    blockDataDestroy(pMerger->pDataBlock);
92,780✔
1796
    pMerger->pDataBlock = NULL;
92,780✔
1797
  }
1798
  if (pMerger->pFilter != NULL) {
92,780✔
1799
    filterFreeInfo(pMerger->pFilter);
7,310✔
1800
    pMerger->pFilter = NULL;
7,310✔
1801
  }
1802

1803
  if (pMerger->pPseudoColValues != NULL) {
92,780✔
1804
    blockDataDestroy(pMerger->pPseudoColValues);
61,013✔
1805
    pMerger->pPseudoColValues = NULL;
61,013✔
1806
  }
1807

1808
  if (pMerger->pReaderInfos != NULL) {
92,780✔
1809
    taosArrayDestroyEx(pMerger->pReaderInfos, stNewVtableMergerDestoryReaderInfo);
92,780✔
1810
    pMerger->pReaderInfos = NULL;
92,780✔
1811
  }
1812
  if (pMerger->pDataMerger != NULL) {
92,780✔
1813
    tMergeTreeDestroy(&pMerger->pDataMerger);
75,430✔
1814
    pMerger->pDataMerger = NULL;
75,430✔
1815
  }
1816
  taosMemFreeClear(*ppMerger);
92,780✔
1817
}
1818

1819
void stNewVtableMergerReset(SSTriggerNewVtableMerger *pMerger) {
2,703,229✔
1820
  if (pMerger == NULL) {
2,703,229✔
1821
    return;
×
1822
  }
1823

1824
  pMerger->inUse = false;
2,703,229✔
1825
  blockDataEmpty(pMerger->pDataBlock);
2,703,229✔
1826

1827
  if (pMerger->pPseudoColValues != NULL) {
2,703,229✔
1828
    blockDataEmpty(pMerger->pPseudoColValues);
1,280,856✔
1829
  }
1830

1831
  if (pMerger->pReaderInfos != NULL) {
2,703,229✔
1832
    for (int32_t i = 0; i < TARRAY_SIZE(pMerger->pReaderInfos); i++) {
4,753,315✔
1833
      SNewVtableMergerReaderInfo *pReaderInfo = TARRAY_GET_ELEM(pMerger->pReaderInfos, i);
2,050,086✔
1834
      stNewTimestampSorterReset(pReaderInfo->pReader);
2,050,086✔
1835
      pReaderInfo->startIdx = 0;
2,050,086✔
1836
      pReaderInfo->endIdx = 0;
2,050,086✔
1837
    }
1838
    pMerger->nReaders = 0;
2,703,229✔
1839
  }
1840
}
1841

1842
int32_t stNewVtableMergerSetData(SSTriggerNewVtableMerger *pMerger, int64_t vtbUid, int32_t tsSlotId,
1,056,405✔
1843
                                 STimeWindow *pReadRange, SObjList *pTableUids, SArray *pTableColRefs,
1844
                                 SSHashObj *pMetas, SSHashObj *pSlices) {
1845
  int32_t             code = TSDB_CODE_SUCCESS;
1,056,405✔
1846
  int32_t             lino = 0;
1,056,405✔
1847
  SStreamTriggerTask *pTask = pMerger->pTask;
1,056,405✔
1848

1849
  QUERY_CHECK_CONDITION(!pMerger->inUse, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,056,405✔
1850
  QUERY_CHECK_CONDITION(!stNewVtableMergerNeedPseudoCols(pMerger), code, lino, _end, TSDB_CODE_INVALID_PARA);
1,056,405✔
1851

1852
  pMerger->inUse = true;
1,056,405✔
1853
  pMerger->nReaders = 0;
1,056,405✔
1854
  pMerger->pDataBlock->info.id.uid = vtbUid;
1,056,405✔
1855
  pMerger->tsSlotId = tsSlotId;
1,056,405✔
1856

1857
  int32_t      nTables = taosArrayGetSize(pTableColRefs);
1,056,405✔
1858
  int64_t     *ar = NULL;
1,056,405✔
1859
  SObjListIter iter = {0};
1,056,405✔
1860
  taosObjListInitIter(pTableUids, &iter, TOBJLIST_ITER_FORWARD);
1,056,405✔
1861
  while ((ar = taosObjListIterNext(&iter)) != NULL) {
2,147,948✔
1862
    if (ar[0] != vtbUid) {
1,091,543✔
1863
      continue;
33,626✔
1864
    }
1865
    int64_t               otbUid = ar[1];
1,057,917✔
1866
    SSTriggerTableColRef *pColRef = NULL;
1,057,917✔
1867
    for (int32_t i = 0; i < nTables; i++) {
1,059,429✔
1868
      SSTriggerTableColRef *pTmpColRef = TARRAY_GET_ELEM(pTableColRefs, i);
1,059,429✔
1869
      if (pTmpColRef->otbUid == otbUid) {
1,059,429✔
1870
        pColRef = pTmpColRef;
1,057,917✔
1871
        break;
1,057,917✔
1872
      }
1873
    }
1874
    SSTriggerDataSlice *pSlice = tSimpleHashGet(pSlices, &otbUid, sizeof(int64_t));
1,057,917✔
1875
    QUERY_CHECK_NULL(pSlice, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
1,057,917✔
1876
    SNewVtableMergerReaderInfo *pReaderInfo = NULL;
1,057,917✔
1877
    if (pMerger->nReaders < TARRAY_SIZE(pMerger->pReaderInfos)) {
1,057,917✔
1878
      pReaderInfo = TARRAY_GET_ELEM(pMerger->pReaderInfos, pMerger->nReaders);
982,055✔
1879
    } else {
1880
      pReaderInfo = taosArrayReserve(pMerger->pReaderInfos, 1);
75,862✔
1881
      QUERY_CHECK_NULL(pReaderInfo, code, lino, _end, terrno);
75,862✔
1882
    }
1883
    pMerger->nReaders++;
1,057,917✔
1884
    if (pReaderInfo->pReader == NULL) {
1,057,917✔
1885
      pReaderInfo->pReader = taosMemoryCalloc(1, sizeof(SSTriggerNewTimestampSorter));
75,862✔
1886
      QUERY_CHECK_NULL(pReaderInfo->pReader, code, lino, _end, terrno);
75,862✔
1887
      code = stNewTimestampSorterInit(pReaderInfo->pReader, pTask, 0);
75,862✔
1888
      QUERY_CHECK_CODE(code, lino, _end);
75,862✔
1889
    }
1890
    SObjList *pMeta = tSimpleHashGet(pMetas, &pColRef->otbVgId, sizeof(int32_t));
1,057,917✔
1891
    QUERY_CHECK_NULL(pMeta, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
1,057,917✔
1892
    code = stNewTimestampSorterSetData(pReaderInfo->pReader, pColRef->otbUid, tsSlotId, pReadRange, pMeta, pSlice);
1,057,917✔
1893
    QUERY_CHECK_CODE(code, lino, _end);
1,057,917✔
1894
    pReaderInfo->pColRef = pColRef;
1,057,917✔
1895
    code = stNewTimestampSorterNextDataBlock(pReaderInfo->pReader, NULL, &pReaderInfo->startIdx, &pReaderInfo->endIdx);
1,057,917✔
1896
    QUERY_CHECK_CODE(code, lino, _end);
1,057,917✔
1897
  }
1898

1899
  if (pMerger->nReaders == 0) {
1,056,405✔
1900
    goto _end;
×
1901
  }
1902

1903
  if (pMerger->pDataMerger != NULL && pMerger->pDataMerger->numOfSources < pMerger->nReaders) {
1,056,405✔
1904
    tMergeTreeDestroy(&pMerger->pDataMerger);
×
1905
  }
1906
  if (pMerger->pDataMerger == NULL) {
1,056,199✔
1907
    // round up to the nearest multiple of 8
1908
    int32_t capacity = (pMerger->nReaders + 7) / 8 * 8;
75,430✔
1909
    code = tMergeTreeCreate(&pMerger->pDataMerger, capacity, pMerger->pReaderInfos, stNewVtableMergerReaderInfoCompare);
75,430✔
1910
    QUERY_CHECK_CODE(code, lino, _end);
75,430✔
1911
  } else {
1912
    code = tMergeTreeRebuild(pMerger->pDataMerger);
980,769✔
1913
    QUERY_CHECK_CODE(code, lino, _end);
980,975✔
1914
  }
1915

1916
_end:
1,056,405✔
1917
  if (TSDB_CODE_SUCCESS != code) {
1,056,405✔
1918
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1919
  }
1920
  return code;
1,056,405✔
1921
}
1922

1923
static int32_t stNewVtableMergerDoRetrieve(SSTriggerNewVtableMerger *pMerger, bool *filled) {
3,222,255✔
1924
  int32_t             code = TSDB_CODE_SUCCESS;
3,222,255✔
1925
  int32_t             lino = 0;
3,222,255✔
1926
  SStreamTriggerTask *pTask = pMerger->pTask;
3,222,255✔
1927

1928
  QUERY_CHECK_CONDITION(pMerger->nReaders > 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
3,222,255✔
1929

1930
  *filled = false;
3,222,255✔
1931

1932
  int32_t                     idx = tMergeTreeGetChosenIndex(pMerger->pDataMerger);
3,222,255✔
1933
  SNewVtableMergerReaderInfo *pReaderInfo = TARRAY_GET_ELEM(pMerger->pReaderInfos, idx);
3,222,255✔
1934
  if (pReaderInfo->startIdx >= pReaderInfo->endIdx) {
3,222,255✔
1935
    goto _end;
2,091,451✔
1936
  }
1937

1938
  SSDataBlock     *pVirDataBlock = pMerger->pDataBlock;
1,130,804✔
1939
  SColumnInfoData *pVirTsCol = taosArrayGet(pVirDataBlock->pDataBlock, pMerger->tsSlotId);
1,130,804✔
1940
  QUERY_CHECK_NULL(pVirTsCol, code, lino, _end, terrno);
1,130,804✔
1941
  int64_t *pVirTsData = (int64_t *)pVirTsCol->pData;
1,130,804✔
1942

1943
  SSDataBlock     *pOrigDataBlock = pReaderInfo->pReader->pDataBlock;
1,130,804✔
1944
  SColumnInfoData *pOrigTsCol = taosArrayGet(pOrigDataBlock->pDataBlock, pMerger->tsSlotId);
1,130,804✔
1945
  QUERY_CHECK_NULL(pOrigTsCol, code, lino, _end, terrno);
1,130,804✔
1946
  int64_t *pOrigTsData = (int64_t *)pOrigTsCol->pData;
1,130,804✔
1947

1948
  int32_t virStartIdx = blockDataGetNumOfRows(pVirDataBlock);
1,130,804✔
1949
  if (virStartIdx > 0 && pVirTsData[virStartIdx - 1] == pOrigTsData[pReaderInfo->startIdx]) {
1,130,804✔
1950
    // merge to the last row
1951
    --virStartIdx;
×
1952
  }
1953

1954
  if (virStartIdx >= VTABLE_MERGER_NROWS_PER_BLOCK) {
1,130,804✔
1955
    // current virtual data block is full
1956
    goto _end;
×
1957
  }
1958

1959
  *filled = true;
1,130,804✔
1960
  int32_t nCols = taosArrayGetSize(pReaderInfo->pColRef->pNewColMatches);
1,130,804✔
1961
  if (pMerger->nReaders == 1) {
1,130,804✔
1962
    // copy whole data block from the single original table
1963
    int32_t nRowsToCopy =
1,128,212✔
1964
        TMIN(VTABLE_MERGER_NROWS_PER_BLOCK - virStartIdx, pReaderInfo->endIdx - pReaderInfo->startIdx);
1,128,212✔
1965
    code = colDataAssignNRows(pVirTsCol, virStartIdx, pOrigTsCol, pReaderInfo->startIdx, nRowsToCopy);
1,128,212✔
1966
    QUERY_CHECK_CODE(code, lino, _end);
1,128,212✔
1967
    for (int32_t i = 0; i < nCols; i++) {
4,354,341✔
1968
      SSTriggerColMatch *pColMatch = TARRAY_GET_ELEM(pReaderInfo->pColRef->pNewColMatches, i);
3,226,129✔
1969
      SColumnInfoData   *pVirCol = taosArrayGet(pVirDataBlock->pDataBlock, pColMatch->vtbSlotId);
3,226,129✔
1970
      QUERY_CHECK_NULL(pVirCol, code, lino, _end, terrno);
3,226,129✔
1971
      SColumnInfoData *pOrigCol = taosArrayGet(pOrigDataBlock->pDataBlock, pColMatch->otbSlotId);
3,226,129✔
1972
      QUERY_CHECK_NULL(pOrigCol, code, lino, _end, terrno);
3,226,129✔
1973
      code = colDataAssignNRows(pVirCol, virStartIdx, pOrigCol, pReaderInfo->startIdx, nRowsToCopy);
3,226,129✔
1974
      QUERY_CHECK_CODE(code, lino, _end);
3,226,129✔
1975
    }
1976
    pReaderInfo->startIdx += nRowsToCopy;
1,128,212✔
1977
    pVirDataBlock->info.rows = virStartIdx + nRowsToCopy;
1,128,212✔
1978
    if (pReaderInfo->startIdx >= pReaderInfo->endIdx) {
1,128,212✔
1979
      // this original data block is fully processed
1980
      code =
1981
          stNewTimestampSorterNextDataBlock(pReaderInfo->pReader, NULL, &pReaderInfo->startIdx, &pReaderInfo->endIdx);
1,128,212✔
1982
      QUERY_CHECK_CODE(code, lino, _end);
1,128,212✔
1983
    }
1984
  } else {
1985
    // copy single row from the selected original table
1986
    QUERY_CHECK_CONDITION(!colDataIsNull_s(pOrigTsCol, pReaderInfo->startIdx), code, lino, _end,
5,184✔
1987
                          TSDB_CODE_INVALID_PARA);
1988
    char *pData = colDataGetData(pOrigTsCol, pReaderInfo->startIdx);
2,592✔
1989
    code = colDataSetVal(pVirTsCol, virStartIdx, pData, false);
2,592✔
1990
    QUERY_CHECK_CODE(code, lino, _end);
2,592✔
1991
    for (int32_t i = 0; i < nCols; i++) {
7,776✔
1992
      SSTriggerColMatch *pColMatch = TARRAY_GET_ELEM(pReaderInfo->pColRef->pNewColMatches, i);
5,184✔
1993
      SColumnInfoData   *pVirCol = taosArrayGet(pVirDataBlock->pDataBlock, pColMatch->vtbSlotId);
5,184✔
1994
      QUERY_CHECK_NULL(pVirCol, code, lino, _end, terrno);
5,184✔
1995
      SColumnInfoData *pOrigCol = taosArrayGet(pOrigDataBlock->pDataBlock, pColMatch->otbSlotId);
5,184✔
1996
      QUERY_CHECK_NULL(pOrigCol, code, lino, _end, terrno);
5,184✔
1997
      if (colDataIsNull_s(pOrigCol, pReaderInfo->startIdx)) {
10,368✔
1998
        colDataSetNULL(pVirCol, virStartIdx);
×
1999
      } else {
2000
        pData = colDataGetData(pOrigCol, pReaderInfo->startIdx);
5,184✔
2001
        code = colDataSetVal(pVirCol, virStartIdx, pData, false);
5,184✔
2002
        QUERY_CHECK_CODE(code, lino, _end);
5,184✔
2003
      }
2004
    }
2005
    pVirDataBlock->info.rows = virStartIdx + 1;
2,592✔
2006
    pReaderInfo->startIdx++;
2,592✔
2007
    if (pReaderInfo->startIdx >= pReaderInfo->endIdx) {
2,592✔
2008
      // this original data block is fully processed
2009
      code =
2010
          stNewTimestampSorterNextDataBlock(pReaderInfo->pReader, NULL, &pReaderInfo->startIdx, &pReaderInfo->endIdx);
1,728✔
2011
      QUERY_CHECK_CODE(code, lino, _end);
1,728✔
2012
    }
2013
    code = tMergeTreeAdjust(pMerger->pDataMerger, tMergeTreeGetAdjustIndex(pMerger->pDataMerger));
2,592✔
2014
    QUERY_CHECK_CODE(code, lino, _end);
2,592✔
2015
  }
2016

2017
_end:
2,592✔
2018
  if (TSDB_CODE_SUCCESS != code) {
3,222,255✔
2019
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2020
  }
2021
  return code;
3,222,255✔
2022
}
2023

2024
int32_t stNewVtableMergerNextDataBlock(SSTriggerNewVtableMerger *pMerger, SSDataBlock **ppDataBlock, int32_t *pStartIdx,
2,091,451✔
2025
                                       int32_t *pEndIdx) {
2026
  int32_t             code = TSDB_CODE_SUCCESS;
2,091,451✔
2027
  int32_t             lino = 0;
2,091,451✔
2028
  SStreamTriggerTask *pTask = pMerger->pTask;
2,091,451✔
2029
  SColumnInfoData    *p = NULL;
2,091,451✔
2030

2031
  QUERY_CHECK_CONDITION(pMerger->inUse, code, lino, _end, TSDB_CODE_INVALID_PARA);
2,091,451✔
2032

2033
  if (ppDataBlock != NULL) {
2,091,451✔
2034
    *ppDataBlock = NULL;
2,091,451✔
2035
  }
2036
  *pStartIdx = 0;
2,091,451✔
2037
  *pEndIdx = 0;
2,091,451✔
2038

2039
  if (pMerger->nReaders == 0) {
2,091,451✔
2040
    goto _end;
×
2041
  }
2042

2043
_retrieve:
2,091,451✔
2044
  blockDataReset(pMerger->pDataBlock);
2,091,451✔
2045
  // set all columns to NULL by default
2046
  int32_t ncols = blockDataGetNumOfCols(pMerger->pDataBlock);
2,091,245✔
2047
  for (int32_t i = 0; i < ncols; i++) {
9,041,411✔
2048
    SColumnInfoData *pCol = TARRAY_GET_ELEM(pMerger->pDataBlock->pDataBlock, i);
6,949,960✔
2049
    colDataSetNNULL(pCol, 0, VTABLE_MERGER_NROWS_PER_BLOCK);
2050
  }
2051
  bool filled = true;
2,091,451✔
2052
  while (filled) {
5,313,706✔
2053
    code = stNewVtableMergerDoRetrieve(pMerger, &filled);
3,222,255✔
2054
    QUERY_CHECK_CODE(code, lino, _end);
3,222,255✔
2055
  }
2056

2057
  int32_t nrows = blockDataGetNumOfRows(pMerger->pDataBlock);
2,091,451✔
2058
  if (nrows > 0) {
2,091,451✔
2059
    if (pMerger->pIsPseudoCol != NULL) {
1,035,046✔
2060
      int32_t j = 0;
359,416✔
2061
      for (int32_t i = 0; i < ncols; i++) {
1,818,221✔
2062
        if (!pMerger->pIsPseudoCol[i]) {
1,458,805✔
2063
          continue;
985,011✔
2064
        }
2065
        SColumnInfoData *pSrc = taosArrayGet(pMerger->pPseudoColValues->pDataBlock, j);
473,794✔
2066
        QUERY_CHECK_NULL(pSrc, code, lino, _end, terrno);
473,794✔
2067
        j++;
473,794✔
2068
        SColumnInfoData *pDst = TARRAY_GET_ELEM(pMerger->pDataBlock->pDataBlock, i);
473,794✔
2069
        if (!colDataIsNull_s(pSrc, 0)) {
473,794✔
2070
          if (!IS_VAR_DATA_TYPE(pDst->info.type) && pDst->nullbitmap != NULL) {
473,794✔
2071
            int32_t bmLen = BitmapLen(nrows);
15,370✔
2072
            memset(pDst->nullbitmap, 0, bmLen);
15,370✔
2073
          }
2074
          code = colDataCopyNItems(pDst, 0, colDataGetData(pSrc, 0), nrows, false);
473,794✔
2075
          QUERY_CHECK_CODE(code, lino, _end);
473,794✔
2076
        }
2077
      }
2078
    }
2079
    if (pMerger->pFilter != NULL) {
1,035,046✔
2080
      int32_t status = 0;
24,123✔
2081
      code = filterExecute(pMerger->pFilter, pMerger->pDataBlock, &p, NULL, ncols, &status);
24,123✔
2082
      QUERY_CHECK_CODE(code, lino, _end);
24,123✔
2083
      code = trimDataBlock(pMerger->pDataBlock, nrows, (bool *)p->pData);
24,123✔
2084
      QUERY_CHECK_CODE(code, lino, _end);
24,123✔
2085
      int32_t nrowsAfterFilter = blockDataGetNumOfRows(pMerger->pDataBlock);
24,123✔
2086
      ST_TASK_DLOG("vtable merger filter applied, before: %d, after: %d", nrows, nrowsAfterFilter);
24,123✔
2087
      if (nrowsAfterFilter == 0 && nrows == VTABLE_MERGER_NROWS_PER_BLOCK) {
24,123✔
2088
        // need to continue retrieve data until at least one row passes the filter
2089
        goto _retrieve;
×
2090
      }
2091
      nrows = nrowsAfterFilter;
24,123✔
2092
    }
2093
  }
2094

2095
  if (nrows == 0) {
2,091,451✔
2096
    goto _end;
1,056,405✔
2097
  }
2098

2099
  printDataBlock(pMerger->pDataBlock, __func__, "stream_vtable_data", pTask->task.streamId);
1,035,046✔
2100

2101
  if (ppDataBlock != NULL) {
1,035,046✔
2102
    *ppDataBlock = pMerger->pDataBlock;
1,035,046✔
2103
  }
2104
  *pStartIdx = 0;
1,035,046✔
2105
  *pEndIdx = nrows;
1,035,046✔
2106

2107
_end:
2,091,451✔
2108
  if (p != NULL) {
2,091,451✔
2109
    colDataDestroy(p);
24,123✔
2110
    taosMemoryFreeClear(p);
24,123✔
2111
  }
2112
  if (TSDB_CODE_SUCCESS != code) {
2,091,451✔
2113
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2114
  }
2115
  return code;
2,091,451✔
2116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc