• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4653

06 Aug 2025 01:23AM UTC coverage: 60.5% (-0.5%) from 60.986%
#4653

push

travis-ci

web-flow
fix: [TD-37190]: disable ignore_nodata_trigger when window type is not interval/sliding or period. (#32405)

138499 of 291677 branches covered (47.48%)

Branch coverage included in aggregate %.

52 of 106 new or added lines in 1 file covered. (49.06%)

1648 existing lines in 119 files now uncovered.

209836 of 284084 relevant lines covered (73.86%)

4732834.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.27
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "scalar.h"
17
#include "streamReader.h"
18
#include "tdb.h"
19
#include "tencode.h"
20
#include "tglobal.h"
21
#include "tmsg.h"
22
#include "vnd.h"
23
#include "vnode.h"
24
#include "vnodeInt.h"
25

26
#define BUILD_OPTION(options, sStreamInfo, _ver, _groupSort, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
27
                     _gid, _initReader, _uidList)                                                                        \
28
  SStreamTriggerReaderTaskInnerOptions options = {.suid = sStreamInfo->suid,                                       \
29
                                                  .uid = sStreamInfo->uid,                                         \
30
                                                  .ver = _ver,                                                     \
31
                                                  .tableType = sStreamInfo->tableType,                             \
32
                                                  .groupSort = _groupSort,                                         \
33
                                                  .order = _order,                                                 \
34
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
35
                                                  .pTagCond = sStreamInfo->pTagCond,                               \
36
                                                  .pTagIndexCond = sStreamInfo->pTagIndexCond,                     \
37
                                                  .pConditions = sStreamInfo->pConditions,                         \
38
                                                  .partitionCols = sStreamInfo->partitionCols,                     \
39
                                                  .schemas = _schemas,                                             \
40
                                                  .isSchema = _isSchema,                                           \
41
                                                  .scanMode = _scanMode,                                           \
42
                                                  .gid = _gid,                                                     \
43
                                                  .initReader = _initReader,                                       \
44
                                                  .uidList = _uidList};
45

46
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
521✔
47

48
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
49

50
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
40,083✔
51
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
40,083✔
52
  if (pSrc == NULL) {
40,050!
UNCOV
53
    return terrno;
×
54
  }
55

56
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
40,060✔
57
  return 0;
40,060✔
58
}
59

60
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
1,664✔
61
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
1,664✔
62
  if (code != TSDB_CODE_SUCCESS) {
1,664!
63
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
64
  }
65

66
  return code;
1,664✔
67
}
68

69
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
348✔
70
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
348✔
71
}
72

73
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
63✔
74
  int32_t code = 0;
63✔
75
  int32_t lino = 0;
63✔
76
  void*   buf = NULL;
63✔
77
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
63✔
78
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
63!
79
  buf = rpcMallocCont(len);
63✔
80
  STREAM_CHECK_NULL_GOTO(buf, terrno);
63!
81
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
63✔
82
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
63!
83
  *data = buf;
63✔
84
  *size = len;
63✔
85
  buf = NULL;
63✔
86
end:
63✔
87
  rpcFreeCont(buf);
63✔
88
  return code;
63✔
89
}
90

91
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
165✔
92
  int32_t code = 0;
165✔
93
  int32_t lino = 0;
165✔
94
  void*   buf = NULL;
165✔
95
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
165✔
96
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
165!
97
  buf = rpcMallocCont(len);
165✔
98
  STREAM_CHECK_NULL_GOTO(buf, terrno);
165!
99
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
165✔
100
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
165!
101
  *data = buf;
165✔
102
  *size = len;
165✔
103
  buf = NULL;
165✔
104
end:
165✔
105
  rpcFreeCont(buf);
165✔
106
  return code;
165✔
107
}
108

109
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
500✔
110
  int32_t code = 0;
500✔
111
  int32_t lino = 0;
500✔
112
  void*   buf = NULL;
500✔
113
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
500✔
114
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
498!
115
  buf = rpcMallocCont(len);
498✔
116
  STREAM_CHECK_NULL_GOTO(buf, terrno);
499!
117
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
499✔
118
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
500!
119
  *data = buf;
500✔
120
  *size = len;
500✔
121
  buf = NULL;
500✔
122
end:
500✔
123
  rpcFreeCont(buf);
500✔
124
  return code;
500✔
125
}
126

127

128
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
18,379✔
129
  int32_t code = 0;
18,379✔
130
  int32_t lino = 0;
18,379✔
131
  void*   buf = NULL;
18,379✔
132
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
18,379!
133
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
11,400✔
134
  buf = rpcMallocCont(dataEncodeSize);
11,399✔
135
  STREAM_CHECK_NULL_GOTO(buf, terrno);
11,400!
136
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
11,400✔
137
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
11,403!
138
  *data = buf;
11,403✔
139
  *size = dataEncodeSize;
11,403✔
140
  buf = NULL;
11,403✔
141
end:
18,382✔
142
  rpcFreeCont(buf);
18,382✔
143
  return code;
18,380✔
144
}
145

146
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
288✔
147
  int32_t        pNum = 1;
288✔
148
  STableKeyInfo* pList = NULL;
288✔
149
  int32_t        code = 0;
288✔
150
  int32_t        lino = 0;
288✔
151
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
288!
152
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
289!
153

154
  cleanupQueryTableDataCond(&pTask->cond);
289✔
155
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
289!
156
                                                      pTask->options.twindows, pTask->options.suid, pTask->options.ver));
157
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
289!
158

159
end:
289✔
160
  STREAM_PRINT_LOG_END(code, lino);
289!
161
  return code;
289✔
162
}
163

164
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t gid, bool isVTable, int64_t uid,
6,234✔
165
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
166
  int32_t code = 0;
6,234✔
167
  int32_t lino = 0;
6,234✔
168
  int32_t index = 0;
6,234✔
169
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
6,234!
170
  if (!isVTable) {
6,229✔
171
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &gid));
3,439!
172
  }
173
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
6,218!
174
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
6,202!
175
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
6,190!
176
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
6,186!
177
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
6,191!
178

179
end:
6,196✔
180
  STREAM_PRINT_LOG_END(code, lino)
6,196!
181
  return code;
6,194✔
182
}
183

184
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
6,188✔
185
  pTSchema->numOfCols = 1;
6,188✔
186
  pTSchema->version = ver;
6,188✔
187
  pTSchema->columns[0].colId = colId;
6,188✔
188
  pTSchema->columns[0].type = type;
6,188✔
189
  pTSchema->columns[0].bytes = bytes;
6,188✔
190
}
6,188✔
191

192
int32_t retrieveWalMetaData(SSubmitTbData* pSubmitTbData, void* pTableList, bool isVTable, SSDataBlock* pBlock,
90,983✔
193
                            int64_t ver) {
194
  int32_t code = 0;
90,983✔
195
  int32_t lino = 0;
90,983✔
196

197
  int64_t   uid = pSubmitTbData->uid;
90,983✔
198
  int32_t   numOfRows = 0;
90,983✔
199
  int64_t   skey = 0;
90,983✔
200
  int64_t   ekey = 0;
90,983✔
201
  STSchema* pTSchema = NULL;
90,983✔
202
  uint64_t  gid = 0;
90,983✔
203
  if (isVTable) {
90,983✔
204
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS);
8,582✔
205
  } else {
206
    STREAM_CHECK_CONDITION_GOTO(!qStreamUidInTableList(pTableList, uid), TDB_CODE_SUCCESS);
82,401✔
207
    gid = qStreamGetGroupId(pTableList, uid);
3,364✔
208
  }
209

210
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
6,187!
211
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
212
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
213
    numOfRows = pCol->nVal;
×
214

215
    SColVal colVal = {0};
×
216
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, 0, &colVal));
×
217
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
218

219
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, numOfRows - 1, &colVal));
×
220
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
221
  } else {
222
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
6,187✔
223
    SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, 0);
6,164✔
224
    STREAM_CHECK_NULL_GOTO(pRow, terrno);
6,161!
225
    SColVal colVal = {0};
6,161✔
226
    pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn));
6,161!
227
    STREAM_CHECK_NULL_GOTO(pTSchema, terrno);
6,225!
228
    buildTSchema(pTSchema, pSubmitTbData->sver, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES);
6,225✔
229
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
6,184!
230
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
6,174✔
231
    pRow = taosArrayGetP(pSubmitTbData->aRowP, numOfRows - 1);
6,174✔
232
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
6,165!
233
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
6,169✔
234
  }
235

236
  STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_SUBMIT_DATA, gid, isVTable, uid, skey, ekey, ver, numOfRows));
6,169!
237
  pBlock->info.rows++;
6,122✔
238
  stDebug("stream reader scan submit data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64 ", gid %" PRIu64
6,122✔
239
          ", rows:%d",
240
          uid, skey, ekey, gid, numOfRows);
241

242
end:
3,259✔
243
  taosMemoryFree(pTSchema);
91,100!
244
  STREAM_PRINT_LOG_END(code, lino)
91,150!
245
  return code;
91,152✔
246
}
247

248
int32_t retrieveWalData(SVnode* pVnode, SSubmitTbData* pSubmitTbData, SSDataBlock* pBlock, STimeWindow* window) {
10,246✔
249
  stDebug("stream reader retrieve data block %p", pSubmitTbData);
10,246✔
250
  int32_t code = 0;
10,246✔
251
  int32_t lino = 0;
10,246✔
252

253
  int32_t ver = pSubmitTbData->sver;
10,246✔
254
  int64_t uid = pSubmitTbData->uid;
10,246✔
255
  int32_t numOfRows = 0;
10,246✔
256

257
  STSchema* schemas = metaGetTbTSchema(pVnode->pMeta, uid, ver, 1);
10,246✔
258
  STREAM_CHECK_NULL_GOTO(schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
10,257!
259
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
10,257!
260
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
261
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
262
    int32_t rowStart = 0;
×
263
    int32_t rowEnd = pCol->nVal;
×
264
    if (window != NULL) {
×
265
      SColVal colVal = {0};
×
266
      rowStart = -1;
×
267
      rowEnd = -1;
×
268
      for (int32_t k = 0; k < pCol->nVal; k++) {
×
269
        STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
270
        int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
271
        if (ts >= window->skey && rowStart == -1) {
×
272
          rowStart = k;
×
273
        }
274
        if (ts > window->ekey && rowEnd == -1) {
×
275
          rowEnd = k;
×
276
        }
277
      }
278
      STREAM_CHECK_CONDITION_GOTO(rowStart == -1 || rowStart == rowEnd, TDB_CODE_SUCCESS);
×
279

280
      if (rowStart != -1 && rowEnd == -1) {
×
281
        rowEnd = pCol->nVal;
×
282
      }
283
    }
284
    numOfRows = rowEnd - rowStart;
×
285
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
×
286
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
287
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
288
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
289
      if (i >= taosArrayGetSize(pSubmitTbData->aCol)) {
×
290
        break;
×
291
      }
292
      for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aCol); j++) {
×
293
        SColData* pCol = taosArrayGet(pSubmitTbData->aCol, j);
×
294
        STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
295
        if (pCol->cid == pColData->info.colId) {
×
296
          for (int32_t k = rowStart; k < rowEnd; k++) {
×
297
            SColVal colVal = {0};
×
298
            STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
299
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
300
                                                !COL_VAL_IS_VALUE(&colVal)));
301
          }
302
        }
303
      }
304
    }
305
  } else {
306
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, taosArrayGetSize(pSubmitTbData->aRowP)));
10,257!
307
    for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aRowP); j++) {
2,726,923✔
308
      SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, j);
2,716,668✔
309
      STREAM_CHECK_NULL_GOTO(pRow, terrno);
2,716,671!
310
      SColVal colVal = {0};
2,716,671✔
311
      STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, 0, &colVal));
2,716,671!
312
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
2,716,672✔
313
      if (window != NULL && (ts < window->skey || ts > window->ekey)) {
2,716,672✔
314
        continue;
43✔
315
      }
316
      for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
12,553,961✔
317
        if (i >= schemas->numOfCols) {
12,551,508✔
318
          break;
2,714,162✔
319
        }
320
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
9,837,346✔
321
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
9,837,346✔
322
        SColVal colVal = {0};
9,837,342✔
323
        int32_t sourceIdx = 0;
9,837,342✔
324
        while (1) {
325
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, sourceIdx, &colVal));
23,564,360!
326
          if (colVal.cid < pColData->info.colId) {
23,564,347✔
327
            sourceIdx++;
13,727,018✔
328
            continue;
13,727,018✔
329
          } else {
330
            break;
9,837,329✔
331
          }
332
        }
333
        if (colVal.cid == pColData->info.colId && COL_VAL_IS_VALUE(&colVal)) {
9,837,329!
334
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
9,836,447!
335
            varColSetVarData(pColData, numOfRows, colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal));
25✔
336
          } else {
337
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
9,836,422!
338
          }
339
        } else {
340
          colDataSetNULL(pColData, numOfRows);
882!
341
        }
342
      }
343
      numOfRows++;
2,716,624✔
344
    }
345
  }
346

347
  pBlock->info.rows = numOfRows;
10,263✔
348

349
end:
10,263✔
350
  taosMemoryFree(schemas);
10,263!
351
  STREAM_PRINT_LOG_END(code, lino)
10,263!
352
  return code;
10,259✔
353
}
354

355
static int32_t buildDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, SDeleteRes* req, int64_t uid, int64_t ver){
153✔
356
  int32_t    code = 0;
153✔
357
  int32_t    lino = 0;
153✔
358
  uint64_t   gid = 0;
153✔
359
  stDebug("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
153✔
360
  if (isVTable) {
153✔
361
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS)
42✔
362
  } else {
363
    gid = qStreamGetGroupId(pTableList, uid);
111✔
364
    STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
111✔
365
  }
366
  STREAM_CHECK_RET_GOTO(
74!
367
      buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, uid, req->skey, req->ekey, ver, 1));
368
  pBlock->info.rows++;
74✔
369

370
  stDebug("stream reader scan delete end data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
74✔
371
end:
48✔
372
  return code;
153✔
373
}
374

375
static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
153✔
376
                              int64_t ver) {
377
  int32_t    code = 0;
153✔
378
  int32_t    lino = 0;
153✔
379
  SDecoder   decoder = {0};
153✔
380
  SDeleteRes req = {0};
153✔
381
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
153✔
382
  tDecoderInit(&decoder, data, len);
153✔
383
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
153!
384
  
385
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
306✔
386
    uint64_t* uid = taosArrayGet(req.uidList, i);
153✔
387
    STREAM_CHECK_NULL_GOTO(uid, terrno);
153!
388
    STREAM_CHECK_RET_GOTO(buildDeleteData(pTableList, isVTable, pBlock, &req, *uid, ver));
153!
389
  }
390

391
end:
153✔
392
  taosArrayDestroy(req.uidList);
153✔
393
  tDecoderClear(&decoder);
153✔
394
  return code;
153✔
395
}
396

397
static int32_t scanDropTable(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
×
398
                             int64_t ver) {
399
  int32_t  code = 0;
×
400
  int32_t  lino = 0;
×
401
  SDecoder decoder = {0};
×
402

403
  SVDropTbBatchReq req = {0};
×
404
  tDecoderInit(&decoder, data, len);
×
405
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
×
406
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
407
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
408
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
×
409
    uint64_t gid = 0;
×
410
    if (isVTable) {
×
411
      if (taosHashGet(pTableList, &pDropTbReq->uid, sizeof(pDropTbReq->uid)) == NULL) {
×
412
        continue;
×
413
      }
414
    } else {
415
      gid = qStreamGetGroupId(pTableList, pDropTbReq->uid);
×
416
      if (gid == -1) continue;
×
417
    }
418

419
    STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_DELETE_TABLE, gid, isVTable, pDropTbReq->uid, 0, 0, ver, 1));
×
420
    pBlock->info.rows++;
×
421
    stDebug("stream reader scan drop :uid %" PRIu64 ", gid %" PRIu64, pDropTbReq->uid, gid);
×
422
  }
423

424
end:
×
425
  tDecoderClear(&decoder);
×
426
  return code;
×
427
}
428

429
static int32_t scanSubmitData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
91,081✔
430
                              int64_t ver) {
431
  int32_t  code = 0;
91,081✔
432
  int32_t  lino = 0;
91,081✔
433
  SDecoder decoder = {0};
91,081✔
434

435
  SSubmitReq2 submit = {0};
91,081✔
436
  tDecoderInit(&decoder, data, len);
91,081✔
437
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
91,008!
438

439
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
91,034✔
440

441
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfBlocks));
91,006!
442

443
  int32_t nextBlk = -1;
91,038✔
444
  while (++nextBlk < numOfBlocks) {
182,185✔
445
    stDebug("stream reader scan submit, next data block %d/%d", nextBlk, numOfBlocks);
91,029✔
446
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
91,029✔
447
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
90,993!
448
    STREAM_CHECK_RET_GOTO(retrieveWalMetaData(pSubmitTbData, pTableList, isVTable, pBlock, ver));
90,993!
449
  }
450
end:
91,156✔
451
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
91,156✔
452
  tDecoderClear(&decoder);
91,123✔
453
  return code;
91,181✔
454
}
455

456
static int32_t scanWal(SVnode* pVnode, void* pTableList, bool isVTable, SSDataBlock* pBlock, int64_t lastVer,
7,116✔
457
                       int8_t deleteData, int8_t deleteTb, int64_t ctime, int64_t* retVer) {
458
  int32_t code = 0;
7,116✔
459
  int32_t lino = 0;
7,116✔
460

461
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
7,116✔
462
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
7,112!
463
  *retVer = walGetAppliedVer(pWalReader->pWal);
7,112✔
464
  STREAM_CHECK_CONDITION_GOTO(walReaderSeekVer(pWalReader, lastVer + 1) != 0, TSDB_CODE_SUCCESS);
7,121✔
465

466
  while (1) {
92,101✔
467
    *retVer = walGetAppliedVer(pWalReader->pWal);
93,567✔
468
    STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader, true) < 0, TSDB_CODE_SUCCESS);
93,542✔
469

470
    SWalCont* wCont = &pWalReader->pHead->head;
92,078✔
471
    if (wCont->ingestTs / 1000 > ctime) break;
92,078!
472
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
92,078✔
473
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
92,078✔
474
    int64_t ver = wCont->version;
92,078✔
475

476
    stDebug("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
92,078✔
477
      TD_VID(pVnode), ver, wCont->msgType, deleteData, deleteTb);
478
    if (wCont->msgType == TDMT_VND_DELETE && deleteData != 0) {
92,078✔
479
      STREAM_CHECK_RET_GOTO(scanDeleteData(pTableList, isVTable, pBlock, data, len, ver));
153!
480
    } else if (wCont->msgType == TDMT_VND_DROP_TABLE && deleteTb != 0) {
91,925!
481
      STREAM_CHECK_RET_GOTO(scanDropTable(pTableList, isVTable, pBlock, data, len, ver));
×
482
    } else if (wCont->msgType == TDMT_VND_SUBMIT) {
91,925✔
483
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
91,106✔
484
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
91,106✔
485
      STREAM_CHECK_RET_GOTO(scanSubmitData(pTableList, isVTable, pBlock, data, len, ver));
91,106!
486
    }
487

488
    if (pBlock->info.rows >= STREAM_RETURN_ROWS_NUM) {
92,101!
489
      break;
×
490
    }
491
  }
492

493
end:
7,117✔
494
  walCloseReader(pWalReader);
7,117✔
495
  STREAM_PRINT_LOG_END(code, lino);
7,114!
496
  return code;
7,119✔
497
}
498

499
int32_t scanWalOneVer(SVnode* pVnode, SSDataBlock* pBlock, SSDataBlock* pBlockRet,
10,264✔
500
                      int64_t ver, int64_t uid, STimeWindow* window) {
501
  int32_t     code = 0;
10,264✔
502
  int32_t     lino = 0;
10,264✔
503
  SSubmitReq2 submit = {0};
10,264✔
504
  SDecoder    decoder = {0};
10,264✔
505

506
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
10,264✔
507
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
10,262!
508

509
  STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, ver));
10,262!
510
  STREAM_CHECK_CONDITION_GOTO(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT, TSDB_CODE_STREAM_WAL_VER_NOT_DATA);
10,255!
511
  STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
10,255!
512
  void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
10,264✔
513
  int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
10,264✔
514

515
  int32_t nextBlk = -1;
10,264✔
516
  tDecoderInit(&decoder, pBody, bodyLen);
10,264✔
517
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
10,255!
518

519
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
10,263✔
520
  while (++nextBlk < numOfBlocks) {
20,505✔
521
    stDebug("stream reader next data block %d/%d", nextBlk, numOfBlocks);
10,247✔
522
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
10,247✔
523
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
10,249!
524
    if (pSubmitTbData->uid != uid) {
10,251!
525
      stDebug("stream reader skip data block uid:%" PRId64, pSubmitTbData->uid);
×
526
      continue;
×
527
    }
528
    STREAM_CHECK_RET_GOTO(retrieveWalData(pVnode, pSubmitTbData, pBlock, window));
10,251!
529
    printDataBlock(pBlock, __func__, "");
10,254✔
530

531
    blockDataMerge(pBlockRet, pBlock);
10,252✔
532
    blockDataCleanup(pBlock);
10,258✔
533
  }
534

535
end:
10,258✔
536
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
10,258✔
537
  walCloseReader(pWalReader);
10,262✔
538
  tDecoderClear(&decoder);
10,267✔
539
  STREAM_PRINT_LOG_END(code, lino);
10,267!
540
  return code;
10,267✔
541
}
542

543
static int32_t processTag(SVnode* pVnode, SExprInfo* pExpr, int32_t numOfExpr, SStorageAPI* api, SSDataBlock* pBlock) {
7,916✔
544
  int32_t     code = 0;
7,916✔
545
  int32_t     lino = 0;
7,916✔
546
  SMetaReader mr = {0};
7,916✔
547

548
  if (numOfExpr == 0) {
7,916!
549
    return TSDB_CODE_SUCCESS;
×
550
  }
551
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
7,916✔
552
  code = api->metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
7,917✔
553
  if (code != TSDB_CODE_SUCCESS) {
7,910!
554
    stError("failed to get table meta, uid:%" PRId64 ", code:%s", pBlock->info.id.uid, tstrerror(code));
×
555
  }
556
  api->metaReaderFn.readerReleaseLock(&mr);
7,910✔
557
  for (int32_t j = 0; j < numOfExpr; ++j) {
33,549✔
558
    const SExprInfo* pExpr1 = &pExpr[j];
25,640✔
559
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
25,640✔
560

561
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
25,640✔
562
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
25,630!
563
    if (mr.me.name == NULL) {
25,631!
564
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
565
      continue;
×
566
    }
567
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
25,631✔
568

569
    int32_t functionId = pExpr1->pExpr->_function.functionId;
25,633✔
570

571
    // this is to handle the tbname
572
    if (fmIsScanPseudoColumnFunc(functionId)) {
25,633✔
573
      int32_t fType = pExpr1->pExpr->_function.functionType;
7,913✔
574
      if (fType == FUNCTION_TYPE_TBNAME) {
7,913!
575
        STREAM_CHECK_RET_GOTO(setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name));
7,915!
576
        pColInfoData->info.colId = -1;
7,905✔
577
      }
578
    } else {  // these are tags
579
      STagVal tagVal = {0};
17,721✔
580
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
17,721✔
581
      const char* p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
17,721✔
582

583
      char* data = NULL;
17,715✔
584
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
17,715!
585
        data = tTagValToData((const STagVal*)p, false);
17,720✔
586
      } else {
587
        data = (char*)p;
×
588
      }
589

590
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
17,722!
591
      if (isNullVal) {
17,722!
592
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
593
      } else {
594
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
17,722✔
595
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
17,720!
596
          taosMemoryFree(data);
10,579!
597
        }
598
        STREAM_CHECK_RET_GOTO(code);
17,728!
599
      }
600
    }
601
  }
602

603
end:
7,909✔
604
  api->metaReaderFn.clearReader(&mr);
7,909✔
605

606
  STREAM_PRINT_LOG_END(code, lino);
7,910!
607
  return code;
7,905✔
608
}
609

610
static int32_t processWalVerData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamInfo, int64_t ver, bool isTrigger,
7,779✔
611
                                 int64_t uid, STimeWindow* window, SSDataBlock* pSrcBlock, SSDataBlock** pBlock) {
612
  int32_t      code = 0;
7,779✔
613
  int32_t      lino = 0;
7,779✔
614
  SFilterInfo* pFilterInfo = NULL;
7,779✔
615
  SSDataBlock* pBlock1 = NULL;
7,779✔
616
  SSDataBlock* pBlock2 = NULL;
7,779✔
617

618
  SExprInfo*   pExpr = sStreamInfo->pExprInfo;
7,779✔
619
  int32_t      numOfExpr = sStreamInfo->numOfExpr;
7,779✔
620

621
  STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamInfo->pConditions, &pFilterInfo, 0, NULL));
7,779!
622

623
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock1));
7,779!
624
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock2));
7,783!
625
  if (!isTrigger) STREAM_CHECK_RET_GOTO(createOneDataBlock(pSrcBlock, false, pBlock));
7,782!
626

627
  pBlock2->info.id.uid = uid;
7,782✔
628
  pBlock1->info.id.uid = uid;
7,782✔
629

630
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
7,782!
631

632
  if (pBlock2->info.rows > 0) {
7,785!
633
    SStorageAPI  api = {0};
7,785✔
634
    initStorageAPI(&api);
7,785✔
635
    STREAM_CHECK_RET_GOTO(processTag(pVnode, pExpr, numOfExpr, &api, pBlock2));
7,782!
636
  }
637
  STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock2, pFilterInfo));
7,767!
638
  if (!isTrigger) {
7,778✔
639
    blockDataTransform(*pBlock, pBlock2);
5,629✔
640
  } else {
641
    *pBlock = pBlock2;
2,149✔
642
    pBlock2 = NULL;  
2,149✔
643
  }
644

645
  printDataBlock(*pBlock, __func__, "processWalVerData2");
7,777✔
646

647
end:
7,775✔
648
  STREAM_PRINT_LOG_END(code, lino);
7,775!
649
  filterFreeInfo(pFilterInfo);
7,775✔
650
  blockDataDestroy(pBlock1);
7,783✔
651
  blockDataDestroy(pBlock2);
7,784✔
652
  return code;
7,783✔
653
}
654

655
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
2,527✔
656
  int32_t code = 0;
2,527✔
657
  int32_t lino = 0;
2,527✔
658
  SMetaReader metaReader = {0};
2,527✔
659
  SStorageAPI api = {0};
2,527✔
660
  initStorageAPI(&api);
2,527✔
661
  *schemas = taosArrayInit(8, sizeof(SSchema));
2,527✔
662
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
2,527!
663
  
664
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
2,527✔
665
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
2,529!
666

667
  SSchemaWrapper* sSchemaWrapper = NULL;
2,525✔
668
  if (metaReader.me.type == TD_CHILD_TABLE) {
2,525✔
669
    int64_t suid = metaReader.me.ctbEntry.suid;
2,391✔
670
    tDecoderClear(&metaReader.coder);
2,391✔
671
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
2,395!
672
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
2,390✔
673
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
134!
674
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
134✔
675
  } else {
676
    qError("invalid table type:%d", metaReader.me.type);
×
677
  }
678

679
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
10,254✔
680
    SSchema* s = sSchemaWrapper->pSchema + j;
7,728✔
681
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
15,458!
682
  }
683

684
end:
2,526✔
685
  api.metaReaderFn.clearReader(&metaReader);
2,526✔
686
  STREAM_PRINT_LOG_END(code, lino);
2,528!
687
  if (code != 0)  taosArrayDestroy(*schemas);
2,528!
688
  return code;
2,528✔
689
}
690

691
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
2,527✔
692
  int32_t code = 0;
2,527✔
693
  int32_t lino = 0;
2,527✔
694
  for (size_t i = 0; i < taosArrayGetSize(schemas); i++) {
10,260✔
695
    SSchema* s = taosArrayGet(schemas, i);
7,731✔
696
    STREAM_CHECK_NULL_GOTO(s, terrno);
7,735!
697

698
    size_t j = 0;
7,735✔
699
    for (; j < taosArrayGetSize(cols); j++) {
17,510✔
700
      col_id_t* id = taosArrayGet(cols, j);
16,685✔
701
      STREAM_CHECK_NULL_GOTO(id, terrno);
16,684!
702
      if (*id == s->colId) {
16,689✔
703
        break;
6,914✔
704
      }
705
    }
706
    if (j == taosArrayGetSize(cols)) {
7,735✔
707
      // not found, remove it
708
      taosArrayRemove(schemas, i);
823✔
709
      i--;
821✔
710
    }
711
  }
712

713
end:
2,528✔
714
  return code;
2,528✔
715
}
716

717
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
2,479✔
718
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
719
  int32_t      code = 0;
2,479✔
720
  int32_t      lino = 0;
2,479✔
721
  SArray*      schemas = NULL;
2,479✔
722

723
  SSDataBlock* pBlock1 = NULL;
2,479✔
724
  SSDataBlock* pBlock2 = NULL;
2,479✔
725

726
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
2,479!
727
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
2,481!
728
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock1));
2,481!
729
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
2,481!
730

731
  pBlock2->info.id.uid = uid;
2,482✔
732
  pBlock1->info.id.uid = uid;
2,482✔
733

734
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
2,482!
735
  printDataBlock(pBlock2, __func__, "");
2,482✔
736

737
  *pBlock = pBlock2;
2,482✔
738
  pBlock2 = NULL;
2,482✔
739

740
end:
2,482✔
741
  STREAM_PRINT_LOG_END(code, lino);
2,482!
742
  blockDataDestroy(pBlock1);
2,482✔
743
  blockDataDestroy(pBlock2);
2,482✔
744
  taosArrayDestroy(schemas);
2,482✔
745
  return code;
2,482✔
746
}
747

748
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
361✔
749
                                    STargetNode* pTargetNodeTs) {
750
  int32_t code = 0;
361✔
751
  int32_t lino = 0;
361✔
752

753
  SColumnNode*         pCol = NULL;
361✔
754
  SColumnNode*         pCol1 = NULL;
361✔
755
  SValueNode*          pVal = NULL;
361✔
756
  SValueNode*          pVal1 = NULL;
361✔
757
  SOperatorNode*       op = NULL;
361✔
758
  SOperatorNode*       op1 = NULL;
361✔
759
  SLogicConditionNode* cond = NULL;
361✔
760

761
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
361!
762
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
361✔
763
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
361✔
764
  pCol->node.resType.bytes = LONG_BYTES;
361✔
765
  pCol->slotId = pTargetNodeTs->slotId;
361✔
766
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
361✔
767

768
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
361!
769

770
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
361!
771
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
361✔
772
  pVal->node.resType.bytes = LONG_BYTES;
361✔
773
  pVal->datum.i = start;
361✔
774
  pVal->typeData = start;
361✔
775

776
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
361!
777
  pVal1->datum.i = end;
361✔
778
  pVal1->typeData = end;
361✔
779

780
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
361!
781
  op->opType = OP_TYPE_GREATER_EQUAL;
361✔
782
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
361✔
783
  op->node.resType.bytes = CHAR_BYTES;
361✔
784
  op->pLeft = (SNode*)pCol;
361✔
785
  op->pRight = (SNode*)pVal;
361✔
786
  pCol = NULL;
361✔
787
  pVal = NULL;
361✔
788

789
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
361!
790
  op1->opType = OP_TYPE_LOWER_EQUAL;
361✔
791
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
361✔
792
  op1->node.resType.bytes = CHAR_BYTES;
361✔
793
  op1->pLeft = (SNode*)pCol1;
361✔
794
  op1->pRight = (SNode*)pVal1;
361✔
795
  pCol1 = NULL;
361✔
796
  pVal1 = NULL;
361✔
797

798
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
361!
799
  cond->condType = LOGIC_COND_TYPE_AND;
361✔
800
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
361✔
801
  cond->node.resType.bytes = CHAR_BYTES;
361✔
802
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
361!
803
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
361!
804
  op = NULL;
361✔
805
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
361!
806
  op1 = NULL;
361✔
807

808
  *pCond = cond;
361✔
809

810
end:
361✔
811
  if (code != 0) {
361!
812
    nodesDestroyNode((SNode*)pCol);
×
813
    nodesDestroyNode((SNode*)pCol1);
×
814
    nodesDestroyNode((SNode*)pVal);
×
815
    nodesDestroyNode((SNode*)pVal1);
×
816
    nodesDestroyNode((SNode*)op);
×
817
    nodesDestroyNode((SNode*)op1);
×
818
    nodesDestroyNode((SNode*)cond);
×
819
  }
820
  STREAM_PRINT_LOG_END(code, lino);
361!
821

822
  return code;
361✔
823
}
824

825
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
114✔
826
  int32_t              code = 0;
114✔
827
  int32_t              lino = 0;
114✔
828
  SLogicConditionNode* pAndCondition = NULL;
114✔
829
  SLogicConditionNode* cond = NULL;
114✔
830

831
  if (pTargetNodeTs == NULL) {
114!
832
    vError("stream reader %s no ts column", __func__);
×
833
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
×
834
  }
835
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
114!
836
  cond->condType = LOGIC_COND_TYPE_OR;
114✔
837
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
114✔
838
  cond->node.resType.bytes = CHAR_BYTES;
114✔
839
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
114!
840

841
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
475✔
842
    data->curIdx = i;
361✔
843

844
    SReadHandle handle = {0};
361✔
845
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
361✔
846
    if (!handle.winRangeValid) {
361!
847
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
×
848
              handle.winRange.ekey);
849
      continue;
×
850
    }
851
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
361!
852
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
361✔
853
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
361!
854
    pAndCondition = NULL;
361✔
855
  }
856

857
  *pCond = cond;
114✔
858

859
end:
114✔
860
  if (code != 0) {
114!
861
    nodesDestroyNode((SNode*)pAndCondition);
×
862
    nodesDestroyNode((SNode*)cond);
×
863
  }
864
  STREAM_PRINT_LOG_END(code, lino);
114!
865

866
  return code;
114✔
867
}
868

869
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
14,614✔
870
                                    STimeRangeNode* node, SReadHandle* handle) {
871
  int32_t code = 0;
14,614✔
872
  int32_t lino = 0;
14,614✔
873
  SArray* funcVals = NULL;
14,614✔
874
  if (req->pStRtFuncInfo->withExternalWindow) {
14,614✔
875
    nodesDestroyNode(sStreamReaderCalcInfo->tsConditions);
114✔
876
    filterFreeInfo(sStreamReaderCalcInfo->pFilterInfo);
114✔
877
    sStreamReaderCalcInfo->pFilterInfo = NULL;
114✔
878

879
    STREAM_CHECK_RET_GOTO(createExternalConditions(req->pStRtFuncInfo,
114!
880
                                                   (SLogicConditionNode**)&sStreamReaderCalcInfo->tsConditions,
881
                                                   sStreamReaderCalcInfo->pTargetNodeTs, node));
882

883
    STREAM_CHECK_RET_GOTO(filterInitFromNode((SNode*)sStreamReaderCalcInfo->tsConditions,
114!
884
                                             (SFilterInfo**)&sStreamReaderCalcInfo->pFilterInfo,
885
                                             FLT_OPTION_NO_REWRITE | FLT_OPTION_SCALAR_MODE, NULL));
886
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
114✔
887
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
114✔
888
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
114!
889
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
114!
890

891
    handle->winRange.skey = pFirst->wstart;
114✔
892
    handle->winRange.ekey = pLast->wend;
114✔
893
    handle->winRangeValid = true;
114✔
894
    stDebug("%s withExternalWindow is true, skey:%" PRId64 ", ekey:%" PRId64, __func__, pFirst->wstart, pLast->wend);
114✔
895
  } else {
896
    calcTimeRange(node, req->pStRtFuncInfo, &handle->winRange, &handle->winRangeValid);
14,500✔
897
  }
898

899
end:
14,614✔
900
  taosArrayDestroy(funcVals);
14,614✔
901
  return code;
14,614✔
902
}
903

904
static int32_t createBlockForWalMeta(SSDataBlock** pBlock, bool isVTable) {
7,100✔
905
  int32_t code = 0;
7,100✔
906
  int32_t lino = 0;
7,100✔
907
  SArray* schemas = NULL;
7,100✔
908

909
  schemas = taosArrayInit(8, sizeof(SSchema));
7,100✔
910
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
7,107!
911

912
  int32_t index = 0;
7,107✔
913
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
7,107!
914
  if (!isVTable) {
7,112✔
915
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid
6,035!
916
  }
917
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
7,115!
918
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
7,108!
919
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
7,107!
920
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
7,107!
921
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // nrows
7,106!
922

923
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
7,107!
924

925
end:
7,116✔
926
  taosArrayDestroy(schemas);
7,116✔
927
  return code;
7,124✔
928
}
929

930
static int32_t createOptionsForLastTs(SStreamTriggerReaderTaskInnerOptions* options,
337✔
931
                                      SStreamTriggerReaderInfo*             sStreamReaderInfo) {
932
  int32_t code = 0;
337✔
933
  int32_t lino = 0;
337✔
934
  SArray* schemas = NULL;
337✔
935

936
  schemas = taosArrayInit(4, sizeof(SSchema));
337✔
937
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
339!
938
  STREAM_CHECK_RET_GOTO(
339!
939
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // last ts
940

941
  BUILD_OPTION(op, sStreamReaderInfo, -1, true, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, schemas, true,
339✔
942
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
943
  schemas = NULL;
339✔
944
  *options = op;
339✔
945

946
end:
339✔
947
  taosArrayDestroy(schemas);
339✔
948
  return code;
339✔
949
}
950

951
static int32_t createOptionsForFirstTs(SStreamTriggerReaderTaskInnerOptions* options,
160✔
952
                                       SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t ver) {
953
  int32_t code = 0;
160✔
954
  int32_t lino = 0;
160✔
955
  SArray* schemas = NULL;
160✔
956

957
  schemas = taosArrayInit(4, sizeof(SSchema));
160✔
958
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
160!
959
  STREAM_CHECK_RET_GOTO(
160!
960
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // first ts
961

962
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, TSDB_ORDER_ASC, start, INT64_MAX, schemas, true,
160✔
963
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
964
  schemas = NULL;
160✔
965

966
  *options = op;
160✔
967
end:
160✔
968
  taosArrayDestroy(schemas);
160✔
969
  return code;
160✔
970
}
971

972
static int32_t createOptionsForTsdbMeta(SStreamTriggerReaderTaskInnerOptions* options,
172✔
973
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t end,
974
                                        int64_t gid, int8_t order, int64_t ver, bool onlyTs) {
975
  int32_t code = 0;
172✔
976
  int32_t lino = 0;
172✔
977
  SArray* schemas = NULL;
172✔
978

979
  int32_t index = 1;
172✔
980
  schemas = taosArrayInit(8, sizeof(SSchema));
172✔
981
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
172!
982
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
172!
983
  if (!onlyTs){
172✔
984
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
86!
985
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
86!
986
    if (sStreamReaderInfo->uidList == NULL) {
86✔
987
      STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
8!
988
    }
989
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
86!
990
  }
991
  
992
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, start, end, schemas, true, (gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), gid,
172✔
993
               true, sStreamReaderInfo->uidList);
994
  schemas = NULL;
172✔
995
  *options = op;
172✔
996

997
end:
172✔
998
  taosArrayDestroy(schemas);
172✔
999
  return code;
172✔
1000
}
1001

1002
static int taosCompareInt64Asc(const void* elem1, const void* elem2) {
54✔
1003
  int64_t* node1 = (int64_t*)elem1;
54✔
1004
  int64_t* node2 = (int64_t*)elem2;
54✔
1005

1006
  if (*node1 < *node2) {
54!
1007
    return -1;
×
1008
  }
1009

1010
  return *node1 > *node2;
54✔
1011
}
1012

1013
static int32_t getTableList(SArray** pList, int32_t* pNum, int64_t* suid, int32_t index,
67✔
1014
                            SStreamTriggerReaderInfo* sStreamReaderInfo) {
1015
  int32_t code = 0;
67✔
1016
  int32_t lino = 0;
67✔
1017

1018
  int32_t* start = taosArrayGet(sStreamReaderInfo->uidListIndex, index);
67✔
1019
  STREAM_CHECK_NULL_GOTO(start, terrno);
67!
1020
  int32_t  iStart = *start;
67✔
1021
  int32_t* end = taosArrayGet(sStreamReaderInfo->uidListIndex, index + 1);
67✔
1022
  STREAM_CHECK_NULL_GOTO(end, terrno);
67!
1023
  int32_t iEnd = *end;
67✔
1024
  *pList = taosArrayInit(iEnd - iStart, sizeof(STableKeyInfo));
67✔
1025
  STREAM_CHECK_NULL_GOTO(*pList, terrno);
67!
1026
  for (int32_t i = iStart; i < iEnd; ++i) {
183✔
1027
    int64_t*       uid = taosArrayGet(sStreamReaderInfo->uidList, i);
116✔
1028
    STableKeyInfo* info = taosArrayReserve(*pList, 1);
116✔
1029
    STREAM_CHECK_NULL_GOTO(info, terrno);
116!
1030
    *suid = uid[0];
116✔
1031
    info->uid = uid[1];
116✔
1032
  }
1033
  *pNum = iEnd - iStart;
67✔
1034
end:
67✔
1035
  STREAM_PRINT_LOG_END(code, lino);
67!
1036
  return code;
67✔
1037
}
1038

1039
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
433✔
1040
                                  SStreamReaderTaskInner* pTask) {
1041
  int32_t code = 0;
433✔
1042
  int32_t lino = 0;
433✔
1043

1044
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTask->pTableList), sizeof(STsInfo));
433✔
1045
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
433!
1046
  while (true) {
289✔
1047
    bool hasNext = false;
722✔
1048
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
722!
1049
    if (!hasNext) {
722✔
1050
      break;
433✔
1051
    }
1052
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
479✔
1053
    STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
479✔
1054
    STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
479!
1055
    if (pTask->options.order == TSDB_ORDER_ASC) {
479✔
1056
      tsInfo->ts = pTask->pResBlock->info.window.skey;
355✔
1057
    } else {
1058
      tsInfo->ts = pTask->pResBlock->info.window.ekey;
124✔
1059
    }
1060
    tsInfo->gId = qStreamGetGroupId(pTask->pTableList, pTask->pResBlock->info.id.uid);
479✔
1061
    stDebug("vgId:%d %s get last ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
478✔
1062
            tsInfo->gId, tsRsp->ver);
1063

1064
    pTask->currentGroupIndex++;
478✔
1065
    if (pTask->currentGroupIndex >= qStreamGetTableListGroupNum(pTask->pTableList)) {
478✔
1066
      break;
190✔
1067
    }
1068
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTask));
288!
1069
  }
1070

1071
end:
433✔
1072
  return code;
433✔
1073
}
1074

1075
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
67✔
1076
                               SStreamReaderTaskInner* pTask, int64_t ver) {
1077
  int32_t code = 0;
67✔
1078
  int32_t lino = 0;
67✔
1079
  int64_t suid = 0;
67✔
1080
  SArray* pList = NULL;
67✔
1081
  int32_t pNum = 0;
67✔
1082

1083
  tsRsp->tsInfo = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(STsInfo));
67✔
1084
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
67!
1085
  for (int32_t i = 0; i < taosArrayGetSize(sStreamReaderInfo->uidListIndex) - 1; ++i) {
134✔
1086
    STREAM_CHECK_RET_GOTO(getTableList(&pList, &pNum, &suid, i, sStreamReaderInfo));
67!
1087

1088
    cleanupQueryTableDataCond(&pTask->cond);
67✔
1089
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas,
67!
1090
                                                        pTask->options.isSchema, pTask->options.twindows, suid, ver));
1091
    STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderOpen(
67!
1092
        pVnode, &pTask->cond, taosArrayGet(pList, 0), pNum, pTask->pResBlock, (void**)&pTask->pReader, pTask->idStr, NULL));
1093
    taosArrayDestroy(pList);
67✔
1094
    pList = NULL;
67✔
1095
    while (true) {
52✔
1096
      bool hasNext = false;
119✔
1097
      STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
119!
1098
      if (!hasNext) {
119✔
1099
        break;
67✔
1100
      }
1101
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
52✔
1102
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
52✔
1103
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
52!
1104
      if (pTask->options.order == TSDB_ORDER_ASC) {
52✔
1105
        tsInfo->ts = pTask->pResBlock->info.window.skey;
13✔
1106
      } else {
1107
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
39✔
1108
      }
1109
      tsInfo->gId = pTask->pResBlock->info.id.uid;
52✔
1110
      stDebug("vgId:%d %s get vtable last ts:%" PRId64 ", uid:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__,
52✔
1111
              tsInfo->ts, tsInfo->gId, tsRsp->ver);
1112
    }
1113
    pTask->api.tsdReader.tsdReaderClose(pTask->pReader);
67✔
1114
    pTask->pReader = NULL;
67✔
1115
  }
1116

1117
end:
67✔
1118
  taosArrayDestroy(pList);
67✔
1119
  return code;
67✔
1120
}
1121

1122
static void reSetUid(SStreamTriggerReaderTaskInnerOptions* options, int64_t suid, int64_t uid) {
55✔
1123
  if (suid != 0) options->suid = suid;
55✔
1124
  options->uid = uid;
55✔
1125
  if (options->suid != 0) {
55!
1126
    options->tableType = TD_CHILD_TABLE;
55✔
1127
  } else {
1128
    options->tableType = TD_NORMAL_TABLE;
×
1129
  }
1130
}
55✔
1131

1132
static int32_t createOptionsForTsdbData(SVnode* pVnode, SStreamTriggerReaderTaskInnerOptions* options,
47✔
1133
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid, SArray* cols,
1134
                                        int8_t order,int64_t skey, int64_t ekey, int64_t ver) {
1135
  int32_t code = 0;
47✔
1136
  int32_t lino = 0;
47✔
1137
  SArray* schemas = NULL;
47✔
1138

1139
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
47!
1140
  STREAM_CHECK_RET_GOTO(shrinkScheams(cols, schemas));
47!
1141
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, skey, ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
47✔
1142
  *options = op;
47✔
1143

1144
end:
47✔
1145
  return code;
47✔
1146
}
1147

1148
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
63✔
1149
  int32_t code = 0;
63✔
1150
  int32_t lino = 0;
63✔
1151
  void*   buf = NULL;
63✔
1152
  size_t  size = 0;
63✔
1153
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
63!
1154
  void* pTask = sStreamReaderInfo->pTask;
63✔
1155

1156
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
63✔
1157

1158
  TSWAP(sStreamReaderInfo->uidList, req->setTableReq.uids);
63✔
1159
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidList, TSDB_CODE_INVALID_PARA);
63!
1160

1161
  taosArraySort(sStreamReaderInfo->uidList, taosCompareInt64Asc);
63✔
1162
  if (sStreamReaderInfo->uidListIndex != NULL) {
63!
1163
    taosArrayClear(sStreamReaderInfo->uidListIndex);
×
1164
  } else {
1165
    sStreamReaderInfo->uidListIndex = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(int32_t));
63✔
1166
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidListIndex, TSDB_CODE_INVALID_PARA);
63!
1167
  }
1168

1169
  if (sStreamReaderInfo->uidHash != NULL) {
63!
1170
    taosHashClear(sStreamReaderInfo->uidHash);
×
1171
  } else {
1172
    sStreamReaderInfo->uidHash = taosHashInit(taosArrayGetSize(sStreamReaderInfo->uidList),
63✔
1173
                                              taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1174
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHash, TSDB_CODE_INVALID_PARA);
63!
1175
  }
1176

1177
  int64_t suid = 0;
63✔
1178
  int32_t cnt = taosArrayGetSize(sStreamReaderInfo->uidList);
63✔
1179
  for (int32_t i = 0; i < cnt; ++i) {
170✔
1180
    int64_t* data = taosArrayGet(sStreamReaderInfo->uidList, i);
107✔
1181
    STREAM_CHECK_NULL_GOTO(data, terrno);
107!
1182
    if (*data == 0 || *data != suid) {
107✔
1183
      STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &i), terrno);
126!
1184
    }
1185
    suid = *data;
107✔
1186
    stDebug("vgId:%d %s suid:%" PRId64 ",uid:%" PRId64 ", index:%d", TD_VID(pVnode), __func__, suid, data[1], i);
107✔
1187
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->uidHash, data + 1, LONG_BYTES, &i, sizeof(int32_t)));
107!
1188
  }
1189
  STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &cnt), terrno);
126!
1190

1191
end:
63✔
1192
  STREAM_PRINT_LOG_END_WITHID(code, lino);
63!
1193
  SRpcMsg rsp = {
63✔
1194
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1195
  tmsgSendRsp(&rsp);
63✔
1196
  return code;
63✔
1197
}
1198

1199
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
337✔
1200
  int32_t                 code = 0;
337✔
1201
  int32_t                 lino = 0;
337✔
1202
  SArray*                 schemas = NULL;
337✔
1203
  SStreamReaderTaskInner* pTaskInner = NULL;
337✔
1204
  SStreamTsResponse       lastTsRsp = {0};
337✔
1205
  void*                   buf = NULL;
337✔
1206
  size_t                  size = 0;
337✔
1207

1208
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
337!
1209
  void* pTask = sStreamReaderInfo->pTask;
337✔
1210

1211
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
337✔
1212

1213
  SStreamTriggerReaderTaskInnerOptions options = {0};
337✔
1214
  STREAM_CHECK_RET_GOTO(createOptionsForLastTs(&options, sStreamReaderInfo));
337!
1215
  SStorageAPI api = {0};
339✔
1216
  initStorageAPI(&api);
339✔
1217
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
340!
1218

1219
  lastTsRsp.ver = pVnode->state.applied;
340✔
1220
  if (sStreamReaderInfo->uidList != NULL) {
340✔
1221
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner, -1));
61!
1222
  } else {
1223
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
279!
1224
  }
1225
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
340✔
1226
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
340!
1227

1228
end:
340✔
1229
  STREAM_PRINT_LOG_END_WITHID(code, lino);
340!
1230
  SRpcMsg rsp = {
340✔
1231
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1232
  tmsgSendRsp(&rsp);
340✔
1233
  taosArrayDestroy(lastTsRsp.tsInfo);
340✔
1234
  releaseStreamTask(&pTaskInner);
340✔
1235
  return code;
339✔
1236
}
1237

1238
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
160✔
1239
  int32_t                 code = 0;
160✔
1240
  int32_t                 lino = 0;
160✔
1241
  SStreamReaderTaskInner* pTaskInner = NULL;
160✔
1242
  SStreamTsResponse       firstTsRsp = {0};
160✔
1243
  void*                   buf = NULL;
160✔
1244
  size_t                  size = 0;
160✔
1245

1246
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
160!
1247
  void* pTask = sStreamReaderInfo->pTask;
160✔
1248
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
160✔
1249
  SStreamTriggerReaderTaskInnerOptions options = {0};
160✔
1250
  STREAM_CHECK_RET_GOTO(createOptionsForFirstTs(&options, sStreamReaderInfo, req->firstTsReq.startTime, req->firstTsReq.ver));
160!
1251
  SStorageAPI api = {0};
160✔
1252
  initStorageAPI(&api);
160✔
1253
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
160!
1254
  
1255
  firstTsRsp.ver = pVnode->state.applied;
160✔
1256
  if (sStreamReaderInfo->uidList != NULL) {
160✔
1257
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.ver));
6!
1258
  } else {
1259
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
154!
1260
  }
1261

1262
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
160✔
1263
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
160!
1264

1265
end:
160✔
1266
  STREAM_PRINT_LOG_END_WITHID(code, lino);
160!
1267
  SRpcMsg rsp = {
160✔
1268
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1269
  tmsgSendRsp(&rsp);
160✔
1270
  taosArrayDestroy(firstTsRsp.tsInfo);
160✔
1271
  releaseStreamTask(&pTaskInner);
160✔
1272
  return code;
160✔
1273
}
1274

1275
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
86✔
1276
  int32_t code = 0;
86✔
1277
  int32_t lino = 0;
86✔
1278
  void*   buf = NULL;
86✔
1279
  size_t  size = 0;
86✔
1280
  SStreamTriggerReaderTaskInnerOptions options = {0};
86✔
1281

1282
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
86!
1283
  void* pTask = sStreamReaderInfo->pTask;
86✔
1284
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
86✔
1285

1286
  SStreamReaderTaskInner* pTaskInner = NULL;
86✔
1287
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
86✔
1288

1289
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
86!
1290
    SStreamTriggerReaderTaskInnerOptions optionsTs = {0};
86✔
1291

1292
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&optionsTs, sStreamReaderInfo, req->tsdbMetaReq.startTime,
86!
1293
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, true));
1294
    SStorageAPI api = {0};
86✔
1295
    initStorageAPI(&api);
86✔
1296
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &optionsTs, &pTaskInner, NULL, sStreamReaderInfo->groupIdMap, &api));
86!
1297
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
86!
1298
    
1299
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&options, sStreamReaderInfo, req->tsdbMetaReq.startTime,
86!
1300
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, false));
1301
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(options.schemas, &pTaskInner->pResBlockDst));
86!
1302
  } else {
1303
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1304
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1305
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1306
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1307
  }
1308

1309
  pTaskInner->pResBlockDst->info.rows = 0;
86✔
1310
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
86!
1311
  bool hasNext = true;
86✔
1312
  while (true) {
26✔
1313
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
112!
1314
    if (!hasNext) {
112✔
1315
      break;
86✔
1316
    }
1317
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
26✔
1318
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
26✔
1319

1320
    int32_t index = 0;
26✔
1321
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
26!
1322
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
26!
1323
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
26!
1324
    if (sStreamReaderInfo->uidList == NULL) {
26✔
1325
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
8!
1326
    }
1327
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
26!
1328

1329
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
26✔
1330
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1331
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1332
            pTaskInner->pResBlockDst->info.rows++;
26✔
1333
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
26!
1334
      break;
×
1335
    }
1336
  }
1337

1338
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
86✔
1339
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
86!
1340
  if (!hasNext) {
86!
1341
    taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
86✔
1342
  }
1343

1344
end:
×
1345
  STREAM_PRINT_LOG_END_WITHID(code, lino);
86!
1346
  SRpcMsg rsp = {
86✔
1347
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1348
  tmsgSendRsp(&rsp);
86✔
1349
  taosArrayDestroy(options.schemas);
86✔
1350
  return code;
86✔
1351
}
1352

1353
static int32_t vnodeProcessStreamTsdbTsDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
8✔
1354
  int32_t                 code = 0;
8✔
1355
  int32_t                 lino = 0;
8✔
1356
  SStreamReaderTaskInner* pTaskInner = NULL;
8✔
1357
  void*                   buf = NULL;
8✔
1358
  size_t                  size = 0;
8✔
1359
  SSDataBlock*            pBlockRes = NULL;
8✔
1360

1361
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
8!
1362
  void* pTask = sStreamReaderInfo->pTask;
8✔
1363
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
8!
1364

1365
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
8✔
1366
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
1367
  reSetUid(&options, req->tsdbTsDataReq.suid, req->tsdbTsDataReq.uid);
8✔
1368
  SStorageAPI api = {0};
8✔
1369
  initStorageAPI(&api);
8✔
1370
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
8!
1371
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
8!
1372
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
8!
1373

1374
  while (1) {
8✔
1375
    bool hasNext = false;
16✔
1376
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
16!
1377
    if (!hasNext) {
16✔
1378
      break;
8✔
1379
    }
1380
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
8✔
1381

1382
    SSDataBlock* pBlock = NULL;
8✔
1383
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
8!
1384
    if (pBlock != NULL && pBlock->info.rows > 0) {
8!
1385
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &api, pBlock));
8!
1386
    }
1387
    
1388
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
8!
1389
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
8!
1390
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
8!
1391
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1392
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1393
  }
1394

1395
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
8✔
1396

1397
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
8!
1398
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
8!
1399

1400
end:
8✔
1401
  STREAM_PRINT_LOG_END_WITHID(code, lino);
8!
1402
  SRpcMsg rsp = {
8✔
1403
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1404
  tmsgSendRsp(&rsp);
8✔
1405
  blockDataDestroy(pBlockRes);
8✔
1406

1407
  releaseStreamTask(&pTaskInner);
8✔
1408
  return code;
8✔
1409
}
1410

1411
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
254✔
1412
  int32_t code = 0;
254✔
1413
  int32_t lino = 0;
254✔
1414
  void*   buf = NULL;
254✔
1415
  size_t  size = 0;
254✔
1416

1417
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
254!
1418
  SStreamReaderTaskInner* pTaskInner = NULL;
254✔
1419
  void* pTask = sStreamReaderInfo->pTask;
254✔
1420
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
254✔
1421
  
1422
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
254✔
1423

1424
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
254✔
1425
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, true, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
127✔
1426
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
1427
                 req->tsdbTriggerDataReq.gid, true, NULL);
1428
    SStorageAPI api = {0};
127✔
1429
    initStorageAPI(&api);
127✔
1430
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock,
127!
1431
                                           sStreamReaderInfo->groupIdMap, &api));
1432

1433
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
127!
1434

1435
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
127!
1436

1437
  } else {
1438
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
127✔
1439
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
127!
1440
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
127✔
1441
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
127!
1442
  }
1443

1444
  pTaskInner->pResBlockDst->info.rows = 0;
254✔
1445
  bool hasNext = true;
254✔
1446
  while (1) {
×
1447
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
254!
1448
    if (!hasNext) {
254✔
1449
      break;
127✔
1450
    }
1451
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
127✔
1452
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
127✔
1453

1454
    SSDataBlock* pBlock = NULL;
127✔
1455
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
127!
1456
    if (pBlock != NULL && pBlock->info.rows > 0) {
127!
1457
      STREAM_CHECK_RET_GOTO(
127!
1458
        processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &pTaskInner->api, pBlock));
1459
    }
1460
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
127!
1461
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
127!
1462
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
127✔
1463
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1464
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1465
    if (pTaskInner->pResBlockDst->info.rows > 0) {
127!
1466
      break;
127✔
1467
    }
1468
  }
1469

1470
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
254!
1471
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
254✔
1472
  if (!hasNext) {
254✔
1473
    taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
127✔
1474
  }
1475

1476
end:
127✔
1477
  STREAM_PRINT_LOG_END_WITHID(code, lino);
254!
1478
  SRpcMsg rsp = {
254✔
1479
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1480
  tmsgSendRsp(&rsp);
254✔
1481

1482
  return code;
254✔
1483
}
1484

1485
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
181✔
1486
  int32_t code = 0;
181✔
1487
  int32_t lino = 0;
181✔
1488
  void*   buf = NULL;
181✔
1489
  size_t  size = 0;
181✔
1490
  SSDataBlock*            pBlockRes = NULL;
181✔
1491

1492
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
181!
1493
  void* pTask = sStreamReaderInfo->pTask;
181✔
1494
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
181✔
1495

1496
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
181!
1497

1498
  SStreamReaderTaskInner* pTaskInner = NULL;
181✔
1499
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
181✔
1500

1501
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
181!
1502
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
181✔
1503
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
1504
    SStorageAPI api = {0};
181✔
1505
    initStorageAPI(&api);
181✔
1506
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
181!
1507

1508
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
181!
1509
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
181!
1510
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
181!
1511
  } else {
1512
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1513
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1514
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1515
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1516
  }
1517

1518
  pTaskInner->pResBlockDst->info.rows = 0;
181✔
1519
  bool hasNext = true;
181✔
1520
  while (1) {
166✔
1521
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
347!
1522
    if (!hasNext) {
347✔
1523
      break;
181✔
1524
    }
1525
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
166✔
1526

1527
    SSDataBlock* pBlock = NULL;
166✔
1528
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
166!
1529
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
166!
1530
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
166!
1531
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
166!
1532
      break;
×
1533
    }
1534
  }
1535
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
181✔
1536
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
181!
1537
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
181✔
1538
  if (!hasNext) {
181!
1539
    taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
181✔
1540
  }
1541

1542
end:
×
1543
  STREAM_PRINT_LOG_END_WITHID(code, lino);
181!
1544
  SRpcMsg rsp = {
181✔
1545
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1546
  tmsgSendRsp(&rsp);
181✔
1547
  blockDataDestroy(pBlockRes);
181✔
1548
  return code;
181✔
1549
}
1550

1551
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
47✔
1552
  int32_t code = 0;
47✔
1553
  int32_t lino = 0;
47✔
1554
  void*   buf = NULL;
47✔
1555
  size_t  size = 0;
47✔
1556

1557
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
47!
1558
  void* pTask = sStreamReaderInfo->pTask;
47✔
1559
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
47✔
1560

1561
  SStreamReaderTaskInner* pTaskInner = NULL;
47✔
1562
  int64_t key = req->tsdbDataReq.uid;
47✔
1563

1564
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
47!
1565
    SStreamTriggerReaderTaskInnerOptions options = {0};
47✔
1566

1567
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbData(pVnode, &options, sStreamReaderInfo, req->tsdbDataReq.uid,
47!
1568
                                                   req->tsdbDataReq.cids, req->tsdbDataReq.order, req->tsdbDataReq.skey,
1569
                                                   req->tsdbDataReq.ekey, req->tsdbDataReq.ver));
1570
    reSetUid(&options, req->tsdbDataReq.suid, req->tsdbDataReq.uid);
47✔
1571

1572
    SStorageAPI api = {0};
47✔
1573
    initStorageAPI(&api);
47✔
1574
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
47!
1575

1576
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
47✔
1577
    cleanupQueryTableDataCond(&pTaskInner->cond);
47✔
1578
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
47!
1579
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
1580
                                                        pTaskInner->options.suid, pTaskInner->options.ver));
1581
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
47!
1582
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
1583

1584
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
47!
1585
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
47!
1586
  } else {
1587
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1588
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1589
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1590
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1591
  }
1592

1593
  pTaskInner->pResBlockDst->info.rows = 0;
47✔
1594
  bool hasNext = true;
47✔
1595
  while (1) {
47✔
1596
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
94!
1597
    if (!hasNext) {
94✔
1598
      break;
47✔
1599
    }
1600

1601
    SSDataBlock* pBlock = NULL;
47✔
1602
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
47!
1603
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
47!
1604
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
47!
1605
      break;
×
1606
    }
1607
  }
1608
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
47!
1609
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
47✔
1610
  if (!hasNext) {
47!
1611
    taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
47✔
1612
  }
1613

1614
end:
×
1615
  STREAM_PRINT_LOG_END_WITHID(code, lino);
47!
1616
  SRpcMsg rsp = {
47✔
1617
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1618
  tmsgSendRsp(&rsp);
47✔
1619

1620
  return code;
47✔
1621
}
1622

1623
static int32_t vnodeProcessStreamWalMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
7,121✔
1624
  int32_t      code = 0;
7,121✔
1625
  int32_t      lino = 0;
7,121✔
1626
  void*        buf = NULL;
7,121✔
1627
  size_t       size = 0;
7,121✔
1628
  SSDataBlock* pBlock = NULL;
7,121✔
1629
  void*        pTableList = NULL;
7,121✔
1630
  SNodeList*   groupNew = NULL;
7,121✔
1631
  int64_t      lastVer = 0;
7,121✔
1632

1633
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
7,121✔
1634
  void* pTask = sStreamReaderInfo->pTask;
7,096✔
1635
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64 ",ctime:%" PRId64, TD_VID(pVnode), __func__,
7,096✔
1636
  req->walMetaReq.lastVer, req->walMetaReq.ctime);
1637

1638
  bool isVTable = sStreamReaderInfo->uidList != NULL;
7,096✔
1639
  if (sStreamReaderInfo->uidList == NULL) {
7,096✔
1640
    SStorageAPI api = {0};
6,032✔
1641
    initStorageAPI(&api);
6,032✔
1642
    STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
6,036!
1643
    STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
6,037!
1644
        pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1645
        groupNew, false, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1646
        &pTableList, sStreamReaderInfo->groupIdMap));
1647
  }
1648

1649
  STREAM_CHECK_RET_GOTO(createBlockForWalMeta(&pBlock, isVTable));
7,086!
1650
  STREAM_CHECK_RET_GOTO(scanWal(pVnode, isVTable ? sStreamReaderInfo->uidHash : pTableList, isVTable, pBlock,
7,119!
1651
                                req->walMetaReq.lastVer, sStreamReaderInfo->deleteReCalc,
1652
                                sStreamReaderInfo->deleteOutTbl, req->walMetaReq.ctime, &lastVer));
1653

1654
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
7,119✔
1655
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
7,119!
1656
  printDataBlock(pBlock, __func__, "");
7,111✔
1657

1658
end:
7,137✔
1659
  if (pBlock != NULL && pBlock->info.rows == 0) {
7,137✔
1660
    code = TSDB_CODE_STREAM_NO_DATA;
6,682✔
1661
    buf = rpcMallocCont(sizeof(int64_t));
6,682✔
1662
    *(int64_t *)buf = lastVer;
6,684✔
1663
    size = sizeof(int64_t);
6,684✔
1664
  }
1665
  SRpcMsg rsp = {
7,139✔
1666
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1667
  tmsgSendRsp(&rsp);
7,139✔
1668
  if (code == TSDB_CODE_STREAM_NO_DATA){
7,152✔
1669
    code = 0;
6,693✔
1670
  }
1671
  STREAM_PRINT_LOG_END_WITHID(code, lino);
7,152!
1672
  nodesDestroyList(groupNew);
7,152✔
1673
  blockDataDestroy(pBlock);
7,146✔
1674
  qStreamDestroyTableList(pTableList);
7,152✔
1675

1676
  return code;
7,150✔
1677
}
1678

1679
static int32_t vnodeProcessStreamWalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
10,259✔
1680
  int32_t      code = 0;
10,259✔
1681
  int32_t      lino = 0;
10,259✔
1682
  void*        buf = NULL;
10,259✔
1683
  size_t       size = 0;
10,259✔
1684
  SSDataBlock* pBlock = NULL;
10,259✔
1685

1686
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
10,259!
1687
  void* pTask = sStreamReaderInfo->pTask;
10,259✔
1688
  ST_TASK_DLOG("vgId:%d %s start, request type:%d skey:%" PRId64 ",ekey:%" PRId64 ",uid:%" PRId64 ",ver:%" PRId64, TD_VID(pVnode),
10,259✔
1689
  __func__, req->walDataReq.base.type, req->walDataReq.skey, req->walDataReq.ekey, req->walDataReq.uid, req->walDataReq.ver);
1690

1691
  STimeWindow window = {.skey = req->walDataReq.skey, .ekey = req->walDataReq.ekey};
10,262✔
1692
  if (req->walDataReq.base.type == STRIGGER_PULL_WAL_DATA){
10,262✔
1693
    STREAM_CHECK_RET_GOTO(processWalVerDataVTable(pVnode, req->walDataReq.cids, req->walDataReq.ver,
2,481!
1694
      req->walDataReq.uid, &window, &pBlock));
1695
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_CALC_DATA){
7,781✔
1696
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
5,274!
1697
      req->walDataReq.uid, &window, sStreamReaderInfo->calcResBlock, &pBlock));
1698
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TRIGGER_DATA) {
2,507✔
1699
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, true,
2,153!
1700
      req->walDataReq.uid, &window, sStreamReaderInfo->triggerResBlock, &pBlock));
1701
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TS_DATA){
354!
1702
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
354!
1703
      req->walDataReq.uid, &window, sStreamReaderInfo->tsBlock, &pBlock));
1704

1705
  }
1706
  
1707
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
10,258✔
1708
  printDataBlock(pBlock, __func__, "");
10,258✔
1709
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
10,256!
1710
end:
10,254✔
1711
  STREAM_PRINT_LOG_END_WITHID(code, lino);
10,254!
1712
  SRpcMsg rsp = {
10,254✔
1713
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1714
  tmsgSendRsp(&rsp);
10,254✔
1715

1716
  blockDataDestroy(pBlock);
10,261✔
1717
  return code;
10,266✔
1718
}
1719

1720
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
499✔
1721
  int32_t code = 0;
499✔
1722
  int32_t lino = 0;
499✔
1723
  void*   buf = NULL;
499✔
1724
  size_t  size = 0;
499✔
1725

1726
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
499!
1727
  void* pTask = sStreamReaderInfo->pTask;
499✔
1728
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
499✔
1729

1730
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
499✔
1731
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
499!
1732
  SStreamGroupInfo pGroupInfo = {0};
499✔
1733
  pGroupInfo.gInfo = *gInfo;
499✔
1734

1735
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
499✔
1736
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1737
  buf = rpcMallocCont(size);
499✔
1738
  STREAM_CHECK_NULL_GOTO(buf, terrno);
499!
1739
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
499✔
1740
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1741
end:
499✔
1742
  if (code != 0) {
499!
1743
    rpcFreeCont(buf);
×
1744
    buf = NULL;
×
1745
    size = 0;
×
1746
  }
1747
  STREAM_PRINT_LOG_END_WITHID(code, lino);
499!
1748
  SRpcMsg rsp = {
499✔
1749
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1750
  tmsgSendRsp(&rsp);
499✔
1751

1752
  return code;
499✔
1753
}
1754

1755
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
165✔
1756
  int32_t              code = 0;
165✔
1757
  int32_t              lino = 0;
165✔
1758
  void*                buf = NULL;
165✔
1759
  size_t               size = 0;
165✔
1760
  SStreamMsgVTableInfo vTableInfo = {0};
165✔
1761
  SMetaReader          metaReader = {0};
165✔
1762
  SNodeList*           groupNew = NULL;
165✔
1763
  void*                pTableList = NULL;
165✔
1764
  SStorageAPI api = {0};
165✔
1765
  initStorageAPI(&api);
165✔
1766

1767
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
165!
1768
  void* pTask = sStreamReaderInfo->pTask;
165✔
1769
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
165✔
1770

1771
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
165!
1772
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
165!
1773
      pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1774
      groupNew, true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1775
      &pTableList, sStreamReaderInfo->groupIdMap));
1776

1777
  SArray* cids = req->virTableInfoReq.cids;
165✔
1778
  STREAM_CHECK_NULL_GOTO(cids, terrno);
165!
1779

1780
  SArray* pTableListArray = qStreamGetTableArrayList(pTableList);
165✔
1781
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
165!
1782

1783
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
165✔
1784
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
165!
1785
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
165✔
1786

1787
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
434✔
1788
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
269✔
1789
    if (pKeyInfo == NULL) {
269!
1790
      continue;
×
1791
    }
1792
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
269✔
1793
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
269!
1794
    vTable->uid = pKeyInfo->uid;
269✔
1795
    vTable->gId = pKeyInfo->groupId;
269✔
1796

1797
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
269✔
1798
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
269!
1799
      vTable->cols.nCols = metaReader.me.colRef.nCols;
×
1800
      vTable->cols.version = metaReader.me.colRef.version;
×
1801
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
×
1802
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
×
1803
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
×
1804
      }
1805
    } else {
1806
      vTable->cols.nCols = taosArrayGetSize(cids);
269✔
1807
      vTable->cols.version = metaReader.me.colRef.version;
269✔
1808
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
269!
1809
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
1,121✔
1810
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
2,894✔
1811
          if (metaReader.me.colRef.pColRef[j].hasRef &&
2,581✔
1812
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
1,609✔
1813
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
539✔
1814
            break;
539✔
1815
          }
1816
        }
1817
      }
1818
    }
1819
    tDecoderClear(&metaReader.coder);
269✔
1820
  }
1821
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
165✔
1822
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
165!
1823

1824
end:
165✔
1825
  nodesDestroyList(groupNew);
165✔
1826
  qStreamDestroyTableList(pTableList);
165✔
1827
  tDestroySStreamMsgVTableInfo(&vTableInfo);
165✔
1828
  api.metaReaderFn.clearReader(&metaReader);
165✔
1829
  STREAM_PRINT_LOG_END_WITHID(code, lino);
165!
1830
  SRpcMsg rsp = {
165✔
1831
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1832
  tmsgSendRsp(&rsp);
165✔
1833
  return code;
165✔
1834
}
1835

1836
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
64✔
1837
  int32_t                   code = 0;
64✔
1838
  int32_t                   lino = 0;
64✔
1839
  void*                     buf = NULL;
64✔
1840
  size_t                    size = 0;
64✔
1841
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
64✔
1842
  SMetaReader               metaReader = {0};
64✔
1843
  int64_t streamId = req->base.streamId;
64✔
1844
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
64✔
1845

1846
  SStorageAPI api = {0};
64✔
1847
  initStorageAPI(&api);
64✔
1848

1849
  SArray* cols = req->origTableInfoReq.cols;
64✔
1850
  STREAM_CHECK_NULL_GOTO(cols, terrno);
64!
1851

1852
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
64✔
1853

1854
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
64!
1855

1856
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
64✔
1857
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
266✔
1858
    OTableInfo*    oInfo = taosArrayGet(cols, i);
203✔
1859
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
203✔
1860
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
203!
1861
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
203!
1862
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
203✔
1863
    vTableInfo->uid = metaReader.me.uid;
203✔
1864
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
203✔
1865

1866
    SSchemaWrapper* sSchemaWrapper = NULL;
203✔
1867
    if (metaReader.me.type == TD_CHILD_TABLE) {
203✔
1868
      int64_t suid = metaReader.me.ctbEntry.suid;
186✔
1869
      vTableInfo->suid = suid;
186✔
1870
      tDecoderClear(&metaReader.coder);
186✔
1871
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
186!
1872
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
186✔
1873
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
17!
1874
      vTableInfo->suid = 0;
17✔
1875
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
17✔
1876
    } else {
1877
      stError("invalid table type:%d", metaReader.me.type);
×
1878
    }
1879

1880
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
573!
1881
      SSchema* s = sSchemaWrapper->pSchema + j;
573✔
1882
      if (strcmp(s->name, oInfo->refColName) == 0) {
573✔
1883
        vTableInfo->cid = s->colId;
203✔
1884
        break;
203✔
1885
      }
1886
    }
1887
    if (vTableInfo->cid == 0) {
203!
1888
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
1889
              oInfo->refTableName);
1890
    }
1891
    tDecoderClear(&metaReader.coder);
203✔
1892
  }
1893

1894
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
63!
1895

1896
end:
63✔
1897
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
64✔
1898
  api.metaReaderFn.clearReader(&metaReader);
64✔
1899
  STREAM_PRINT_LOG_END(code, lino);
64!
1900
  SRpcMsg rsp = {
64✔
1901
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1902
  tmsgSendRsp(&rsp);
64✔
1903
  return code;
64✔
1904
}
1905

1906
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
442✔
1907
  int32_t                   code = 0;
442✔
1908
  int32_t                   lino = 0;
442✔
1909
  void*                     buf = NULL;
442✔
1910
  size_t                    size = 0;
442✔
1911
  SSDataBlock* pBlock = NULL;
442✔
1912

1913
  SMetaReader               metaReader = {0};
442✔
1914
  SMetaReader               metaReaderStable = {0};
442✔
1915
  int64_t streamId = req->base.streamId;
442✔
1916
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
442✔
1917

1918
  SStorageAPI api = {0};
442✔
1919
  initStorageAPI(&api);
442✔
1920

1921
  SArray* cols = req->virTablePseudoColReq.cids;
442✔
1922
  STREAM_CHECK_NULL_GOTO(cols, terrno);
442!
1923

1924
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
442✔
1925
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
442!
1926

1927
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
442!
1928

1929
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
442!
1930
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
442✔
1931
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 && *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
17!
1932
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
17✔
1933
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
17!
1934
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
17!
1935
    pBlock->info.rows = 1;
17✔
1936
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
17✔
1937
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
17!
1938
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
17!
1939
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
425!
1940
    int64_t suid = metaReader.me.ctbEntry.suid;
425✔
1941
    api.metaReaderFn.readerReleaseLock(&metaReader);
425✔
1942
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
425✔
1943

1944
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
425!
1945
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
425✔
1946
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
1,491✔
1947
      col_id_t* id = taosArrayGet(cols, i);
1,066✔
1948
      STREAM_CHECK_NULL_GOTO(id, terrno);
1,066!
1949
      if (*id == -1) {
1,066✔
1950
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
425✔
1951
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
425!
1952
        continue;
425✔
1953
      }
1954
      size_t j = 0;
641✔
1955
      for (; j < sSchemaWrapper->nCols; j++) {
1,235!
1956
        SSchema* s = sSchemaWrapper->pSchema + j;
1,235✔
1957
        if (s->colId == *id) {
1,235✔
1958
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
641✔
1959
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
641!
1960
          break;
641✔
1961
        }
1962
      }
1963
      if (j == sSchemaWrapper->nCols) {
641!
1964
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
1965
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
1966
      }
1967
    }
1968
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
425!
1969
    pBlock->info.rows = 1;
425✔
1970
    
1971
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
1,491✔
1972
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,066✔
1973
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
1,066!
1974

1975
      if (pDst->info.colId == -1) {
1,066✔
1976
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
425!
1977
        continue;
425✔
1978
      }
1979
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
641!
1980
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
1981
        continue;
×
1982
      }
1983

1984
      STagVal val = {0};
641✔
1985
      val.cid = pDst->info.colId;
641✔
1986
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
641✔
1987

1988
      char* data = NULL;
641✔
1989
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
641!
1990
        data = tTagValToData((const STagVal*)p, false);
605✔
1991
      } else {
1992
        data = (char*)p;
36✔
1993
      }
1994

1995
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
641!
1996
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
1997

1998
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
641!
1999
          (data != NULL)) {
2000
        taosMemoryFree(data);
108!
2001
      }
2002
    }
2003
  } else {
2004
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
2005
    code = TSDB_CODE_INVALID_PARA;
×
2006
    goto end;
×
2007
  }
2008
  
2009
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
442✔
2010
  printDataBlock(pBlock, __func__, "");
442✔
2011
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
442!
2012

2013
end:
442✔
2014
  api.metaReaderFn.clearReader(&metaReaderStable);
442✔
2015
  api.metaReaderFn.clearReader(&metaReader);
442✔
2016
  STREAM_PRINT_LOG_END(code, lino);
442!
2017
  SRpcMsg rsp = {
442✔
2018
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2019
  tmsgSendRsp(&rsp);
442✔
2020
  blockDataDestroy(pBlock);
442✔
2021
  return code;
442✔
2022
}
2023

2024
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
14,778✔
2025
  int32_t            code = 0;
14,778✔
2026
  int32_t            lino = 0;
14,778✔
2027
  void*              buf = NULL;
14,778✔
2028
  size_t             size = 0;
14,778✔
2029
  void*              taskAddr = NULL;
14,778✔
2030
  SArray*            pResList = NULL;
14,778✔
2031

2032
  SResFetchReq req = {0};
14,778✔
2033
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
14,778!
2034
                              TSDB_CODE_QRY_INVALID_INPUT);
2035
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
14,778✔
2036
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
14,778!
2037

2038
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
14,778!
2039
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
14,778✔
2040
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
14,778!
2041
  void* pTask = sStreamReaderCalcInfo->pTask;
14,778✔
2042
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
14,778✔
2043
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
2044

2045
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
14,778!
2046
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
14,697✔
2047
    int64_t uid = 0;
14,697✔
2048
    if (req.dynTbname) {
14,697✔
2049
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
6✔
2050
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
6!
2051
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
6✔
2052
        if (pValue != NULL && pValue->isTbname) {
6!
2053
          uid = pValue->uid;
6✔
2054
          break;
6✔
2055
        }
2056
      }
2057
    }
2058
    
2059
    SReadHandle handle = {0};
14,697✔
2060
    handle.vnode = pVnode;
14,697✔
2061
    handle.uid = uid;
14,697✔
2062

2063
    initStorageAPI(&handle.api);
14,697✔
2064
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
14,697✔
2065
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
14,295✔
2066
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
14,637✔
2067
      if (node != NULL) {
14,637✔
2068
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle));
14,617!
2069
      }
2070
    }
2071

2072
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
14,697✔
2073
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
14,697✔
2074

2075
    // if (sStreamReaderCalcInfo->pTaskInfo == NULL) {
2076
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
14,697✔
2077
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
2078
                                                    req.taskId));
2079
    // } else {
2080
    // STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, handle.winRange));
2081
    // }
2082

2083
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
14,694!
2084
  }
2085

2086
  if (req.pOpParam != NULL) {
14,775✔
2087
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
48✔
2088
  }
2089
  
2090
  pResList = taosArrayInit(4, POINTER_BYTES);
14,775✔
2091
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
14,775!
2092
  uint64_t ts = 0;
14,775✔
2093
  bool     hasNext = false;
14,775✔
2094
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
14,775✔
2095

2096
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
24,192✔
2097
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
9,422✔
2098
    if (pBlock == NULL) continue;
9,422!
2099
    printDataBlock(pBlock, __func__, "fetch");
9,422✔
2100
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
9,422✔
2101
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo));
327!
2102
      printDataBlock(pBlock, __func__, "fetch filter");
327✔
2103
    }
2104
  }
2105

2106
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
14,770!
2107
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
14,770✔
2108

2109
end:
124✔
2110
  taosArrayDestroy(pResList);
14,778✔
2111
  streamReleaseTask(taskAddr);
14,778✔
2112

2113
  STREAM_PRINT_LOG_END(code, lino);
14,778!
2114
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
14,778✔
2115
  tmsgSendRsp(&rsp);
14,778✔
2116
  tDestroySResFetchReq(&req);
14,778✔
2117
  return code;
14,778✔
2118
}
2119

2120
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
34,570✔
2121
  int32_t                   code = 0;
34,570✔
2122
  int32_t                   lino = 0;
34,570✔
2123
  SSTriggerPullRequestUnion req = {0};
34,570✔
2124
  void*                     taskAddr = NULL;
34,570✔
2125

2126
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
34,570✔
2127
  if (!syncIsReadyForRead(pVnode->sync)) {
34,571✔
2128
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
89✔
2129
    return 0;
93✔
2130
  }
2131

2132
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
34,501✔
2133
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
14,778✔
2134
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
19,723!
2135
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
19,724✔
2136
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
19,724✔
2137
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
19,724!
2138
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
19,710✔
2139
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
2140
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
19,708✔
2141
    switch (req.base.type) {
19,696!
2142
      case STRIGGER_PULL_SET_TABLE:
63✔
2143
        code = vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo);
63✔
2144
        break;
63✔
2145
      case STRIGGER_PULL_LAST_TS:
340✔
2146
        code = vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
340✔
2147
        break;
339✔
2148
      case STRIGGER_PULL_FIRST_TS:
160✔
2149
        code = vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
160✔
2150
        break;
160✔
2151
      case STRIGGER_PULL_TSDB_META:
86✔
2152
      case STRIGGER_PULL_TSDB_META_NEXT:
2153
        code = vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
86✔
2154
        break;
86✔
2155
      case STRIGGER_PULL_TSDB_TS_DATA:
8✔
2156
        code = vnodeProcessStreamTsdbTsDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
8✔
2157
        break;
8✔
2158
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
254✔
2159
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
2160
        code = vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
254✔
2161
        break;
254✔
2162
      case STRIGGER_PULL_TSDB_CALC_DATA:
181✔
2163
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
2164
        code = vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
181✔
2165
        break;
181✔
2166
      case STRIGGER_PULL_TSDB_DATA:
47✔
2167
      case STRIGGER_PULL_TSDB_DATA_NEXT:
2168
        code = vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
47✔
2169
        break;
47✔
2170
      case STRIGGER_PULL_WAL_META:
7,128✔
2171
        code = vnodeProcessStreamWalMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
7,128✔
2172
        break;
7,145✔
2173
      case STRIGGER_PULL_WAL_TS_DATA:
10,259✔
2174
      case STRIGGER_PULL_WAL_TRIGGER_DATA:
2175
      case STRIGGER_PULL_WAL_CALC_DATA:
2176
      case STRIGGER_PULL_WAL_DATA:
2177
        code = vnodeProcessStreamWalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
10,259✔
2178
        break;
10,263✔
2179
      case STRIGGER_PULL_GROUP_COL_VALUE:
499✔
2180
        code = vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo);
499✔
2181
        break;
499✔
2182
      case STRIGGER_PULL_VTABLE_INFO:
165✔
2183
        code = vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo);
165✔
2184
        break;
165✔
2185
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
442✔
2186
        code = vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req);
442✔
2187
        break;
442✔
2188
      case STRIGGER_PULL_OTABLE_INFO:
64✔
2189
        code = vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req);
64✔
2190
        break;
64✔
2191
      default:
×
2192
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
2193
        code = TSDB_CODE_APP_ERROR;
×
2194
        break;
×
2195
    }
2196
  } else {
2197
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
2198
    code = TSDB_CODE_APP_ERROR;
×
2199
  }
2200
end:
19,716✔
2201

2202
  streamReleaseTask(taskAddr);
19,716✔
2203

2204
  tDestroySTriggerPullRequest(&req);
19,720✔
2205
  STREAM_PRINT_LOG_END(code, lino);
19,719!
2206
  return code;
19,706✔
2207
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc