• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4665

12 Aug 2025 07:34AM UTC coverage: 59.901% (-0.6%) from 60.536%
#4665

push

travis-ci

web-flow
Merge pull request #32547 from taosdata/refactor/wangxu/get-started-installer

refactor: get started for installer and docker

137370 of 291999 branches covered (47.04%)

Branch coverage included in aggregate %.

207872 of 284354 relevant lines covered (73.1%)

4846328.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.5
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "scalar.h"
17
#include "streamReader.h"
18
#include "tdb.h"
19
#include "tencode.h"
20
#include "tglobal.h"
21
#include "tmsg.h"
22
#include "vnd.h"
23
#include "vnode.h"
24
#include "vnodeInt.h"
25

26
#define BUILD_OPTION(options, sStreamInfo, _ver, _groupSort, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
27
                     _gid, _initReader, _uidList)                                                                        \
28
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_uidList == NULL ? sStreamInfo->suid : 0),              \
29
                                                  .uid = sStreamInfo->uid,                                         \
30
                                                  .ver = _ver,                                                     \
31
                                                  .tableType = sStreamInfo->tableType,                             \
32
                                                  .groupSort = _groupSort,                                         \
33
                                                  .order = _order,                                                 \
34
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
35
                                                  .pTagCond = sStreamInfo->pTagCond,                               \
36
                                                  .pTagIndexCond = sStreamInfo->pTagIndexCond,                     \
37
                                                  .pConditions = sStreamInfo->pConditions,                         \
38
                                                  .partitionCols = sStreamInfo->partitionCols,                     \
39
                                                  .schemas = _schemas,                                             \
40
                                                  .isSchema = _isSchema,                                           \
41
                                                  .scanMode = _scanMode,                                           \
42
                                                  .gid = _gid,                                                     \
43
                                                  .initReader = _initReader,                                       \
44
                                                  .uidList = _uidList};
45

46
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
164,437✔
47

48
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
49

50
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
37,402✔
51
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
37,402✔
52
  if (pSrc == NULL) {
37,385!
53
    return terrno;
×
54
  }
55

56
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
37,400✔
57
  return 0;
37,400✔
58
}
59

60
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
168,935✔
61
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
168,935✔
62
  if (code != TSDB_CODE_SUCCESS) {
168,927!
63
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
64
  }
65

66
  return code;
168,927✔
67
}
68

69
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
3,102✔
70
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
3,102✔
71
}
72

73
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
66✔
74
  int32_t code = 0;
66✔
75
  int32_t lino = 0;
66✔
76
  void*   buf = NULL;
66✔
77
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
66✔
78
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
66!
79
  buf = rpcMallocCont(len);
66✔
80
  STREAM_CHECK_NULL_GOTO(buf, terrno);
66!
81
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
66✔
82
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
66!
83
  *data = buf;
66✔
84
  *size = len;
66✔
85
  buf = NULL;
66✔
86
end:
66✔
87
  rpcFreeCont(buf);
66✔
88
  return code;
66✔
89
}
90

91
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
177✔
92
  int32_t code = 0;
177✔
93
  int32_t lino = 0;
177✔
94
  void*   buf = NULL;
177✔
95
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
177✔
96
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
178!
97
  buf = rpcMallocCont(len);
178✔
98
  STREAM_CHECK_NULL_GOTO(buf, terrno);
178!
99
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
178✔
100
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
178!
101
  *data = buf;
178✔
102
  *size = len;
178✔
103
  buf = NULL;
178✔
104
end:
178✔
105
  rpcFreeCont(buf);
178✔
106
  return code;
178✔
107
}
108

109
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
670✔
110
  int32_t code = 0;
670✔
111
  int32_t lino = 0;
670✔
112
  void*   buf = NULL;
670✔
113
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
670✔
114
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
668!
115
  buf = rpcMallocCont(len);
668✔
116
  STREAM_CHECK_NULL_GOTO(buf, terrno);
671!
117
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
671✔
118
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
671!
119
  *data = buf;
671✔
120
  *size = len;
671✔
121
  buf = NULL;
671✔
122
end:
671✔
123
  rpcFreeCont(buf);
671✔
124
  return code;
671✔
125
}
126

127

128
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
180,891✔
129
  int32_t code = 0;
180,891✔
130
  int32_t lino = 0;
180,891✔
131
  void*   buf = NULL;
180,891✔
132
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
180,891!
133
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
10,265✔
134
  buf = rpcMallocCont(dataEncodeSize);
10,260✔
135
  STREAM_CHECK_NULL_GOTO(buf, terrno);
10,262!
136
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
10,262✔
137
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
10,264!
138
  *data = buf;
10,264✔
139
  *size = dataEncodeSize;
10,264✔
140
  buf = NULL;
10,264✔
141
end:
180,890✔
142
  rpcFreeCont(buf);
180,890✔
143
  return code;
180,884✔
144
}
145

146
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
383✔
147
  int32_t        pNum = 1;
383✔
148
  STableKeyInfo* pList = NULL;
383✔
149
  int32_t        code = 0;
383✔
150
  int32_t        lino = 0;
383✔
151
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
383!
152
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
383!
153

154
  cleanupQueryTableDataCond(&pTask->cond);
383✔
155
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
383!
156
                                                      pTask->options.twindows, pTask->options.suid, pTask->options.ver));
157
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
383!
158

159
end:
383✔
160
  STREAM_PRINT_LOG_END(code, lino);
383!
161
  return code;
383✔
162
}
163

164
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t gid, bool isVTable, int64_t uid,
5,866✔
165
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
166
  int32_t code = 0;
5,866✔
167
  int32_t lino = 0;
5,866✔
168
  int32_t index = 0;
5,866✔
169
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
5,866!
170
  if (!isVTable) {
5,839✔
171
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &gid));
3,134!
172
  }
173
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
5,806!
174
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
5,751!
175
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
5,735!
176
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
5,735!
177
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
5,739!
178

179
end:
5,749✔
180
  STREAM_PRINT_LOG_END(code, lino)
5,749!
181
  return code;
5,758✔
182
}
183

184
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
5,839✔
185
  pTSchema->numOfCols = 1;
5,839✔
186
  pTSchema->version = ver;
5,839✔
187
  pTSchema->columns[0].colId = colId;
5,839✔
188
  pTSchema->columns[0].type = type;
5,839✔
189
  pTSchema->columns[0].bytes = bytes;
5,839✔
190
}
5,839✔
191

192
int32_t retrieveWalMetaData(SSubmitTbData* pSubmitTbData, void* pTableList, bool isVTable, SSDataBlock* pBlock,
53,354✔
193
                            int64_t ver) {
194
  int32_t code = 0;
53,354✔
195
  int32_t lino = 0;
53,354✔
196

197
  int64_t   uid = pSubmitTbData->uid;
53,354✔
198
  int32_t   numOfRows = 0;
53,354✔
199
  int64_t   skey = 0;
53,354✔
200
  int64_t   ekey = 0;
53,354✔
201
  STSchema* pTSchema = NULL;
53,354✔
202
  uint64_t  gid = 0;
53,354✔
203
  if (isVTable) {
53,354✔
204
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS);
8,740✔
205
  } else {
206
    STREAM_CHECK_CONDITION_GOTO(!qStreamUidInTableList(pTableList, uid), TDB_CODE_SUCCESS);
44,614✔
207
    gid = qStreamGetGroupId(pTableList, uid);
3,063✔
208
  }
209

210
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
5,829!
211
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
212
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
213
    numOfRows = pCol->nVal;
×
214

215
    SColVal colVal = {0};
×
216
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, 0, &colVal));
×
217
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
218

219
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, numOfRows - 1, &colVal));
×
220
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
221
  } else {
222
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
5,829✔
223
    SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, 0);
5,818✔
224
    STREAM_CHECK_NULL_GOTO(pRow, terrno);
5,804!
225
    SColVal colVal = {0};
5,804✔
226
    pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn));
5,804!
227
    STREAM_CHECK_NULL_GOTO(pTSchema, terrno);
5,849!
228
    buildTSchema(pTSchema, pSubmitTbData->sver, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES);
5,849✔
229
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
5,831!
230
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
5,819✔
231
    pRow = taosArrayGetP(pSubmitTbData->aRowP, numOfRows - 1);
5,819✔
232
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
5,801!
233
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
5,796✔
234
  }
235

236
  STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_SUBMIT_DATA, gid, isVTable, uid, skey, ekey, ver, numOfRows));
5,796!
237
  pBlock->info.rows++;
5,692✔
238
  stDebug("stream reader scan submit data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64 ", gid %" PRIu64
5,692✔
239
          ", rows:%d",
240
          uid, skey, ekey, gid, numOfRows);
241

242
end:
3,292✔
243
  taosMemoryFree(pTSchema);
53,408!
244
  STREAM_PRINT_LOG_END(code, lino)
53,508!
245
  return code;
53,512✔
246
}
247

248
int32_t retrieveWalData(SVnode* pVnode, SSubmitTbData* pSubmitTbData, SSDataBlock* pBlock, STimeWindow* window) {
7,514✔
249
  stDebug("stream reader retrieve data block %p", pSubmitTbData);
7,514✔
250
  int32_t code = 0;
7,514✔
251
  int32_t lino = 0;
7,514✔
252

253
  int32_t ver = pSubmitTbData->sver;
7,514✔
254
  int64_t uid = pSubmitTbData->uid;
7,514✔
255
  int32_t numOfRows = 0;
7,514✔
256

257
  STSchema* schemas = metaGetTbTSchema(pVnode->pMeta, uid, ver, 1);
7,514✔
258
  STREAM_CHECK_NULL_GOTO(schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
7,525!
259
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
7,525!
260
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
261
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
262
    int32_t rowStart = 0;
×
263
    int32_t rowEnd = pCol->nVal;
×
264
    if (window != NULL) {
×
265
      SColVal colVal = {0};
×
266
      rowStart = -1;
×
267
      rowEnd = -1;
×
268
      for (int32_t k = 0; k < pCol->nVal; k++) {
×
269
        STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
270
        int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
271
        if (ts >= window->skey && rowStart == -1) {
×
272
          rowStart = k;
×
273
        }
274
        if (ts > window->ekey && rowEnd == -1) {
×
275
          rowEnd = k;
×
276
        }
277
      }
278
      STREAM_CHECK_CONDITION_GOTO(rowStart == -1 || rowStart == rowEnd, TDB_CODE_SUCCESS);
×
279

280
      if (rowStart != -1 && rowEnd == -1) {
×
281
        rowEnd = pCol->nVal;
×
282
      }
283
    }
284
    numOfRows = rowEnd - rowStart;
×
285
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
×
286
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
287
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
288
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
289
      if (i >= taosArrayGetSize(pSubmitTbData->aCol)) {
×
290
        break;
×
291
      }
292
      for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aCol); j++) {
×
293
        SColData* pCol = taosArrayGet(pSubmitTbData->aCol, j);
×
294
        STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
295
        if (pCol->cid == pColData->info.colId) {
×
296
          for (int32_t k = rowStart; k < rowEnd; k++) {
×
297
            SColVal colVal = {0};
×
298
            STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
299
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
300
                                                !COL_VAL_IS_VALUE(&colVal)));
301
          }
302
        }
303
      }
304
    }
305
  } else {
306
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, taosArrayGetSize(pSubmitTbData->aRowP)));
7,525!
307
    for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aRowP); j++) {
523,590✔
308
      SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, j);
516,067✔
309
      STREAM_CHECK_NULL_GOTO(pRow, terrno);
516,071!
310
      SColVal colVal = {0};
516,071✔
311
      STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, 0, &colVal));
516,071!
312
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
516,073✔
313
      if (window != NULL && (ts < window->skey || ts > window->ekey)) {
516,073!
314
        continue;
45✔
315
      }
316
      for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
1,551,840✔
317
        if (i >= schemas->numOfCols) {
1,549,639✔
318
          break;
513,803✔
319
        }
320
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
1,035,836✔
321
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
1,035,834!
322
        SColVal colVal = {0};
1,035,835✔
323
        int32_t sourceIdx = 0;
1,035,835✔
324
        while (1) {
325
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, sourceIdx, &colVal));
1,561,443!
326
          if (colVal.cid < pColData->info.colId) {
1,561,412✔
327
            sourceIdx++;
525,608✔
328
            continue;
525,608✔
329
          } else {
330
            break;
1,035,804✔
331
          }
332
        }
333
        if (colVal.cid == pColData->info.colId && COL_VAL_IS_VALUE(&colVal)) {
1,035,804!
334
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
1,034,918!
335
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, numOfRows, colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
19!
336
          } else {
337
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
1,034,899!
338
          }
339
        } else {
340
          colDataSetNULL(pColData, numOfRows);
886!
341
        }
342
      }
343
      numOfRows++;
516,021✔
344
    }
345
  }
346

347
  pBlock->info.rows = numOfRows;
7,527✔
348

349
end:
7,527✔
350
  taosMemoryFree(schemas);
7,527!
351
  STREAM_PRINT_LOG_END(code, lino)
7,527!
352
  return code;
7,523✔
353
}
354

355
static int32_t buildDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, SDeleteRes* req, int64_t uid, int64_t ver){
128✔
356
  int32_t    code = 0;
128✔
357
  int32_t    lino = 0;
128✔
358
  uint64_t   gid = 0;
128✔
359
  stDebug("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
128✔
360
  if (isVTable) {
128✔
361
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS)
30✔
362
  } else {
363
    gid = qStreamGetGroupId(pTableList, uid);
98✔
364
    STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
98✔
365
  }
366
  STREAM_CHECK_RET_GOTO(
64!
367
      buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, uid, req->skey, req->ekey, ver, 1));
368
  pBlock->info.rows++;
64✔
369

370
  stDebug("stream reader scan delete end data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
64✔
371
end:
48✔
372
  return code;
128✔
373
}
374

375
static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
128✔
376
                              int64_t ver) {
377
  int32_t    code = 0;
128✔
378
  int32_t    lino = 0;
128✔
379
  SDecoder   decoder = {0};
128✔
380
  SDeleteRes req = {0};
128✔
381
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
128✔
382
  tDecoderInit(&decoder, data, len);
128✔
383
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
128!
384
  
385
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
256✔
386
    uint64_t* uid = taosArrayGet(req.uidList, i);
128✔
387
    STREAM_CHECK_NULL_GOTO(uid, terrno);
128!
388
    STREAM_CHECK_RET_GOTO(buildDeleteData(pTableList, isVTable, pBlock, &req, *uid, ver));
128!
389
  }
390

391
end:
128✔
392
  taosArrayDestroy(req.uidList);
128✔
393
  tDecoderClear(&decoder);
128✔
394
  return code;
128✔
395
}
396

397
static int32_t scanDropTable(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
×
398
                             int64_t ver) {
399
  int32_t  code = 0;
×
400
  int32_t  lino = 0;
×
401
  SDecoder decoder = {0};
×
402

403
  SVDropTbBatchReq req = {0};
×
404
  tDecoderInit(&decoder, data, len);
×
405
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
×
406
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
407
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
408
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
×
409
    uint64_t gid = 0;
×
410
    if (isVTable) {
×
411
      if (taosHashGet(pTableList, &pDropTbReq->uid, sizeof(pDropTbReq->uid)) == NULL) {
×
412
        continue;
×
413
      }
414
    } else {
415
      gid = qStreamGetGroupId(pTableList, pDropTbReq->uid);
×
416
      if (gid == -1) continue;
×
417
    }
418

419
    STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_DELETE_TABLE, gid, isVTable, pDropTbReq->uid, 0, 0, ver, 1));
×
420
    pBlock->info.rows++;
×
421
    stDebug("stream reader scan drop :uid %" PRIu64 ", gid %" PRIu64, pDropTbReq->uid, gid);
×
422
  }
423

424
end:
×
425
  tDecoderClear(&decoder);
×
426
  return code;
×
427
}
428

429
static int32_t scanSubmitData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
53,463✔
430
                              int64_t ver) {
431
  int32_t  code = 0;
53,463✔
432
  int32_t  lino = 0;
53,463✔
433
  SDecoder decoder = {0};
53,463✔
434

435
  SSubmitReq2 submit = {0};
53,463✔
436
  tDecoderInit(&decoder, data, len);
53,463✔
437
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
53,448!
438

439
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
53,432✔
440

441
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfBlocks));
53,418!
442

443
  int32_t nextBlk = -1;
53,392✔
444
  while (++nextBlk < numOfBlocks) {
106,893✔
445
    stDebug("stream reader scan submit, next data block %d/%d", nextBlk, numOfBlocks);
53,395✔
446
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
53,395✔
447
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
53,361!
448
    STREAM_CHECK_RET_GOTO(retrieveWalMetaData(pSubmitTbData, pTableList, isVTable, pBlock, ver));
53,361!
449
  }
450
end:
53,498✔
451
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
53,498✔
452
  tDecoderClear(&decoder);
53,503✔
453
  return code;
53,540✔
454
}
455

456
static int32_t scanWal(SVnode* pVnode, void* pTableList, bool isVTable, SSDataBlock* pBlock, int64_t lastVer,
8,056✔
457
                       int8_t deleteData, int8_t deleteTb, int64_t ctime, int64_t* retVer) {
458
  int32_t code = 0;
8,056✔
459
  int32_t lino = 0;
8,056✔
460

461
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
8,056✔
462
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
8,043!
463
  *retVer = walGetAppliedVer(pWalReader->pWal);
8,043✔
464
  STREAM_CHECK_CONDITION_GOTO(walReaderSeekVer(pWalReader, lastVer + 1) != 0, TSDB_CODE_SUCCESS);
8,056✔
465

466
  while (1) {
54,030✔
467
    *retVer = walGetAppliedVer(pWalReader->pWal);
55,469✔
468
    STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader, true) < 0, TSDB_CODE_SUCCESS);
55,445✔
469

470
    SWalCont* wCont = &pWalReader->pHead->head;
53,986✔
471
    if (wCont->ingestTs / 1000 > ctime) break;
53,986!
472
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
53,986✔
473
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
53,986✔
474
    int64_t ver = wCont->version;
53,986✔
475

476
    stDebug("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
53,986✔
477
      TD_VID(pVnode), ver, wCont->msgType, deleteData, deleteTb);
478
    if (wCont->msgType == TDMT_VND_DELETE && deleteData != 0) {
53,981✔
479
      STREAM_CHECK_RET_GOTO(scanDeleteData(pTableList, isVTable, pBlock, data, len, ver));
128!
480
    } else if (wCont->msgType == TDMT_VND_DROP_TABLE && deleteTb != 0) {
53,853!
481
      STREAM_CHECK_RET_GOTO(scanDropTable(pTableList, isVTable, pBlock, data, len, ver));
×
482
    } else if (wCont->msgType == TDMT_VND_SUBMIT) {
53,853✔
483
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
53,479✔
484
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
53,479✔
485
      STREAM_CHECK_RET_GOTO(scanSubmitData(pTableList, isVTable, pBlock, data, len, ver));
53,479!
486
    }
487

488
    if (pBlock->info.rows >= STREAM_RETURN_ROWS_NUM) {
54,030!
489
      break;
×
490
    }
491
  }
492

493
end:
8,052✔
494
  walCloseReader(pWalReader);
8,052✔
495
  STREAM_PRINT_LOG_END(code, lino);
8,060!
496
  return code;
8,059✔
497
}
498

499
int32_t scanWalOneVer(SVnode* pVnode, SSDataBlock* pBlock, SSDataBlock* pBlockRet,
7,525✔
500
                      int64_t ver, int64_t uid, STimeWindow* window) {
501
  int32_t     code = 0;
7,525✔
502
  int32_t     lino = 0;
7,525✔
503
  SSubmitReq2 submit = {0};
7,525✔
504
  SDecoder    decoder = {0};
7,525✔
505

506
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
7,525✔
507
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
7,526!
508

509
  STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, ver));
7,526!
510
  STREAM_CHECK_CONDITION_GOTO(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT, TSDB_CODE_STREAM_WAL_VER_NOT_DATA);
7,522!
511
  STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
7,522!
512
  void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
7,527✔
513
  int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
7,527✔
514

515
  int32_t nextBlk = -1;
7,527✔
516
  tDecoderInit(&decoder, pBody, bodyLen);
7,527✔
517
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
7,526!
518

519
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
7,517✔
520
  while (++nextBlk < numOfBlocks) {
15,034✔
521
    stDebug("stream reader next data block %d/%d", nextBlk, numOfBlocks);
7,513✔
522
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
7,513✔
523
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
7,514!
524
    if (pSubmitTbData->uid != uid) {
7,516!
525
      stDebug("stream reader skip data block uid:%" PRId64, pSubmitTbData->uid);
×
526
      continue;
×
527
    }
528
    STREAM_CHECK_RET_GOTO(retrieveWalData(pVnode, pSubmitTbData, pBlock, window));
7,516!
529
    printDataBlock(pBlock, __func__, "");
7,524✔
530

531
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRet, pBlock));
7,522!
532
    blockDataCleanup(pBlock);
7,525✔
533
  }
534

535
end:
7,521✔
536
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
7,521✔
537
  walCloseReader(pWalReader);
7,524✔
538
  tDecoderClear(&decoder);
7,531✔
539
  STREAM_PRINT_LOG_END(code, lino);
7,530!
540
  return code;
7,528✔
541
}
542

543
static int32_t processTag(SVnode* pVnode, SExprInfo* pExpr, int32_t numOfExpr, SStorageAPI* api, SSDataBlock* pBlock) {
5,508✔
544
  int32_t     code = 0;
5,508✔
545
  int32_t     lino = 0;
5,508✔
546
  SMetaReader mr = {0};
5,508✔
547

548
  if (numOfExpr == 0) {
5,508!
549
    return TSDB_CODE_SUCCESS;
×
550
  }
551
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
5,508✔
552
  code = api->metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
5,510✔
553
  if (code != TSDB_CODE_SUCCESS) {
5,496!
554
    stError("failed to get table meta, uid:%" PRId64 ", code:%s", pBlock->info.id.uid, tstrerror(code));
×
555
  }
556
  api->metaReaderFn.readerReleaseLock(&mr);
5,496✔
557
  for (int32_t j = 0; j < numOfExpr; ++j) {
23,388✔
558
    const SExprInfo* pExpr1 = &pExpr[j];
17,893✔
559
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
17,893✔
560

561
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
17,893✔
562
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
17,889!
563
    if (mr.me.name == NULL) {
17,890!
564
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
565
      continue;
×
566
    }
567
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
17,890✔
568

569
    int32_t functionId = pExpr1->pExpr->_function.functionId;
17,893✔
570

571
    // this is to handle the tbname
572
    if (fmIsScanPseudoColumnFunc(functionId)) {
17,893✔
573
      int32_t fType = pExpr1->pExpr->_function.functionType;
5,500✔
574
      if (fType == FUNCTION_TYPE_TBNAME) {
5,500!
575
        STREAM_CHECK_RET_GOTO(setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name));
5,507!
576
        pColInfoData->info.colId = -1;
5,495✔
577
      }
578
    } else {  // these are tags
579
      STagVal tagVal = {0};
12,388✔
580
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
12,388✔
581
      const char* p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
12,388✔
582

583
      char* data = NULL;
12,383✔
584
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
12,383!
585
        data = tTagValToData((const STagVal*)p, false);
12,386✔
586
      } else {
587
        data = (char*)p;
×
588
      }
589

590
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
12,387!
591
      if (isNullVal) {
12,387!
592
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
593
      } else {
594
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
12,387✔
595
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
12,392!
596
          taosMemoryFree(data);
7,176!
597
        }
598
        STREAM_CHECK_RET_GOTO(code);
12,393!
599
      }
600
    }
601
  }
602

603
end:
5,495✔
604
  api->metaReaderFn.clearReader(&mr);
5,495✔
605

606
  STREAM_PRINT_LOG_END(code, lino);
5,507!
607
  return code;
5,502✔
608
}
609

610
static int32_t processWalVerData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamInfo, int64_t ver, bool isTrigger,
5,299✔
611
                                 int64_t uid, STimeWindow* window, SSDataBlock* pSrcBlock, SSDataBlock** pBlock) {
612
  int32_t      code = 0;
5,299✔
613
  int32_t      lino = 0;
5,299✔
614
  SFilterInfo* pFilterInfo = NULL;
5,299✔
615
  SSDataBlock* pBlock1 = NULL;
5,299✔
616
  SSDataBlock* pBlock2 = NULL;
5,299✔
617

618
  SExprInfo*   pExpr = sStreamInfo->pExprInfo;
5,299✔
619
  int32_t      numOfExpr = sStreamInfo->numOfExpr;
5,299✔
620

621
  STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamInfo->pConditions, &pFilterInfo, 0, NULL));
5,299!
622

623
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock1));
5,296!
624
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock2));
5,298!
625
  if (!isTrigger) STREAM_CHECK_RET_GOTO(createOneDataBlock(pSrcBlock, false, pBlock));
5,297!
626

627
  pBlock2->info.id.uid = uid;
5,296✔
628
  pBlock1->info.id.uid = uid;
5,296✔
629

630
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
5,296!
631

632
  if (pBlock2->info.rows > 0) {
5,297!
633
    SStorageAPI  api = {0};
5,297✔
634
    initStorageAPI(&api);
5,297✔
635
    STREAM_CHECK_RET_GOTO(processTag(pVnode, pExpr, numOfExpr, &api, pBlock2));
5,298!
636
  }
637
  STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock2, pFilterInfo));
5,287!
638
  if (!isTrigger) {
5,293✔
639
    blockDataTransform(*pBlock, pBlock2);
2,947✔
640
  } else {
641
    *pBlock = pBlock2;
2,346✔
642
    pBlock2 = NULL;  
2,346✔
643
  }
644

645
  printDataBlock(*pBlock, __func__, "processWalVerData2");
5,295✔
646

647
end:
5,295✔
648
  STREAM_PRINT_LOG_END(code, lino);
5,295!
649
  filterFreeInfo(pFilterInfo);
5,295✔
650
  blockDataDestroy(pBlock1);
5,297✔
651
  blockDataDestroy(pBlock2);
5,299✔
652
  return code;
5,299✔
653
}
654

655
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
2,548✔
656
  int32_t code = 0;
2,548✔
657
  int32_t lino = 0;
2,548✔
658
  SMetaReader metaReader = {0};
2,548✔
659
  SStorageAPI api = {0};
2,548✔
660
  initStorageAPI(&api);
2,548✔
661
  *schemas = taosArrayInit(8, sizeof(SSchema));
2,549✔
662
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
2,548!
663
  
664
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
2,548✔
665
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
2,549!
666

667
  SSchemaWrapper* sSchemaWrapper = NULL;
2,544✔
668
  if (metaReader.me.type == TD_CHILD_TABLE) {
2,544✔
669
    int64_t suid = metaReader.me.ctbEntry.suid;
2,410✔
670
    tDecoderClear(&metaReader.coder);
2,410✔
671
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
2,414!
672
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
2,412✔
673
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
134!
674
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
134✔
675
  } else {
676
    qError("invalid table type:%d", metaReader.me.type);
×
677
  }
678

679
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
10,090✔
680
    SSchema* s = sSchemaWrapper->pSchema + j;
7,545✔
681
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
15,089!
682
  }
683

684
end:
2,545✔
685
  api.metaReaderFn.clearReader(&metaReader);
2,545✔
686
  STREAM_PRINT_LOG_END(code, lino);
2,550!
687
  if (code != 0)  taosArrayDestroy(*schemas);
2,549!
688
  return code;
2,549✔
689
}
690

691
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
2,549✔
692
  int32_t code = 0;
2,549✔
693
  int32_t lino = 0;
2,549✔
694
  for (size_t i = 0; i < taosArrayGetSize(schemas); i++) {
10,100✔
695
    SSchema* s = taosArrayGet(schemas, i);
7,552✔
696
    STREAM_CHECK_NULL_GOTO(s, terrno);
7,550!
697

698
    size_t j = 0;
7,551✔
699
    for (; j < taosArrayGetSize(cols); j++) {
16,620✔
700
      col_id_t* id = taosArrayGet(cols, j);
15,870✔
701
      STREAM_CHECK_NULL_GOTO(id, terrno);
15,869!
702
      if (*id == s->colId) {
15,873✔
703
        break;
6,804✔
704
      }
705
    }
706
    if (j == taosArrayGetSize(cols)) {
7,550✔
707
      // not found, remove it
708
      taosArrayRemove(schemas, i);
747✔
709
      i--;
747✔
710
    }
711
  }
712

713
end:
2,546✔
714
  return code;
2,546✔
715
}
716

717
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
2,230✔
718
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
719
  int32_t      code = 0;
2,230✔
720
  int32_t      lino = 0;
2,230✔
721
  SArray*      schemas = NULL;
2,230✔
722

723
  SSDataBlock* pBlock1 = NULL;
2,230✔
724
  SSDataBlock* pBlock2 = NULL;
2,230✔
725

726
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
2,230!
727
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
2,230!
728
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock1));
2,227!
729
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
2,231!
730

731
  pBlock2->info.id.uid = uid;
2,231✔
732
  pBlock1->info.id.uid = uid;
2,231✔
733

734
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
2,231!
735
  printDataBlock(pBlock2, __func__, "");
2,231✔
736

737
  *pBlock = pBlock2;
2,231✔
738
  pBlock2 = NULL;
2,231✔
739

740
end:
2,231✔
741
  STREAM_PRINT_LOG_END(code, lino);
2,231!
742
  blockDataDestroy(pBlock1);
2,231✔
743
  blockDataDestroy(pBlock2);
2,229✔
744
  taosArrayDestroy(schemas);
2,230✔
745
  return code;
2,230✔
746
}
747

748
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
381✔
749
                                    STargetNode* pTargetNodeTs) {
750
  int32_t code = 0;
381✔
751
  int32_t lino = 0;
381✔
752

753
  SColumnNode*         pCol = NULL;
381✔
754
  SColumnNode*         pCol1 = NULL;
381✔
755
  SValueNode*          pVal = NULL;
381✔
756
  SValueNode*          pVal1 = NULL;
381✔
757
  SOperatorNode*       op = NULL;
381✔
758
  SOperatorNode*       op1 = NULL;
381✔
759
  SLogicConditionNode* cond = NULL;
381✔
760

761
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
381!
762
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
381✔
763
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
381✔
764
  pCol->node.resType.bytes = LONG_BYTES;
381✔
765
  pCol->slotId = pTargetNodeTs->slotId;
381✔
766
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
381✔
767

768
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
381!
769

770
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
381!
771
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
381✔
772
  pVal->node.resType.bytes = LONG_BYTES;
381✔
773
  pVal->datum.i = start;
381✔
774
  pVal->typeData = start;
381✔
775

776
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
381!
777
  pVal1->datum.i = end;
381✔
778
  pVal1->typeData = end;
381✔
779

780
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
381!
781
  op->opType = OP_TYPE_GREATER_EQUAL;
381✔
782
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
381✔
783
  op->node.resType.bytes = CHAR_BYTES;
381✔
784
  op->pLeft = (SNode*)pCol;
381✔
785
  op->pRight = (SNode*)pVal;
381✔
786
  pCol = NULL;
381✔
787
  pVal = NULL;
381✔
788

789
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
381!
790
  op1->opType = OP_TYPE_LOWER_EQUAL;
381✔
791
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
381✔
792
  op1->node.resType.bytes = CHAR_BYTES;
381✔
793
  op1->pLeft = (SNode*)pCol1;
381✔
794
  op1->pRight = (SNode*)pVal1;
381✔
795
  pCol1 = NULL;
381✔
796
  pVal1 = NULL;
381✔
797

798
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
381!
799
  cond->condType = LOGIC_COND_TYPE_AND;
381✔
800
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
381✔
801
  cond->node.resType.bytes = CHAR_BYTES;
381✔
802
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
381!
803
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
381!
804
  op = NULL;
381✔
805
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
381!
806
  op1 = NULL;
381✔
807

808
  *pCond = cond;
381✔
809

810
end:
381✔
811
  if (code != 0) {
381!
812
    nodesDestroyNode((SNode*)pCol);
×
813
    nodesDestroyNode((SNode*)pCol1);
×
814
    nodesDestroyNode((SNode*)pVal);
×
815
    nodesDestroyNode((SNode*)pVal1);
×
816
    nodesDestroyNode((SNode*)op);
×
817
    nodesDestroyNode((SNode*)op1);
×
818
    nodesDestroyNode((SNode*)cond);
×
819
  }
820
  STREAM_PRINT_LOG_END(code, lino);
381!
821

822
  return code;
381✔
823
}
824

825
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
120✔
826
  int32_t              code = 0;
120✔
827
  int32_t              lino = 0;
120✔
828
  SLogicConditionNode* pAndCondition = NULL;
120✔
829
  SLogicConditionNode* cond = NULL;
120✔
830

831
  if (pTargetNodeTs == NULL) {
120!
832
    vError("stream reader %s no ts column", __func__);
×
833
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
×
834
  }
835
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
120!
836
  cond->condType = LOGIC_COND_TYPE_OR;
120✔
837
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
120✔
838
  cond->node.resType.bytes = CHAR_BYTES;
120✔
839
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
120!
840

841
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
501✔
842
    data->curIdx = i;
381✔
843

844
    SReadHandle handle = {0};
381✔
845
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
381✔
846
    if (!handle.winRangeValid) {
381!
847
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
×
848
              handle.winRange.ekey);
849
      continue;
×
850
    }
851
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
381!
852
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
381✔
853
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
381!
854
    pAndCondition = NULL;
381✔
855
  }
856

857
  *pCond = cond;
120✔
858

859
end:
120✔
860
  if (code != 0) {
120!
861
    nodesDestroyNode((SNode*)pAndCondition);
×
862
    nodesDestroyNode((SNode*)cond);
×
863
  }
864
  STREAM_PRINT_LOG_END(code, lino);
120!
865

866
  return code;
120✔
867
}
868

869
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
14,860✔
870
                                    STimeRangeNode* node, SReadHandle* handle) {
871
  int32_t code = 0;
14,860✔
872
  int32_t lino = 0;
14,860✔
873
  SArray* funcVals = NULL;
14,860✔
874
  if (req->pStRtFuncInfo->withExternalWindow) {
14,860✔
875
    nodesDestroyNode(sStreamReaderCalcInfo->tsConditions);
120✔
876
    filterFreeInfo(sStreamReaderCalcInfo->pFilterInfo);
120✔
877
    sStreamReaderCalcInfo->pFilterInfo = NULL;
120✔
878

879
    STREAM_CHECK_RET_GOTO(createExternalConditions(req->pStRtFuncInfo,
120!
880
                                                   (SLogicConditionNode**)&sStreamReaderCalcInfo->tsConditions,
881
                                                   sStreamReaderCalcInfo->pTargetNodeTs, node));
882

883
    STREAM_CHECK_RET_GOTO(filterInitFromNode((SNode*)sStreamReaderCalcInfo->tsConditions,
120!
884
                                             (SFilterInfo**)&sStreamReaderCalcInfo->pFilterInfo,
885
                                             FLT_OPTION_NO_REWRITE | FLT_OPTION_SCALAR_MODE, NULL));
886
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
120✔
887
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
120✔
888
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
120!
889
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
120!
890

891
    handle->winRange.skey = pFirst->wstart;
120✔
892
    handle->winRange.ekey = pLast->wend;
120✔
893
    handle->winRangeValid = true;
120✔
894
    stDebug("%s withExternalWindow is true, skey:%" PRId64 ", ekey:%" PRId64, __func__, pFirst->wstart, pLast->wend);
120✔
895
  } else {
896
    calcTimeRange(node, req->pStRtFuncInfo, &handle->winRange, &handle->winRangeValid);
14,740✔
897
  }
898

899
end:
14,860✔
900
  taosArrayDestroy(funcVals);
14,860✔
901
  return code;
14,860✔
902
}
903

904
static int32_t createBlockForWalMeta(SSDataBlock** pBlock, bool isVTable) {
8,049✔
905
  int32_t code = 0;
8,049✔
906
  int32_t lino = 0;
8,049✔
907
  SArray* schemas = NULL;
8,049✔
908

909
  schemas = taosArrayInit(8, sizeof(SSchema));
8,049✔
910
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
8,049!
911

912
  int32_t index = 0;
8,049✔
913
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
8,049!
914
  if (!isVTable) {
8,049✔
915
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid
6,924!
916
  }
917
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
8,050!
918
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
8,049!
919
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
8,050!
920
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
8,049!
921
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // nrows
8,051!
922

923
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
8,051!
924

925
end:
8,059✔
926
  taosArrayDestroy(schemas);
8,059✔
927
  return code;
8,059✔
928
}
929

930
static int32_t createOptionsForLastTs(SStreamTriggerReaderTaskInnerOptions* options,
339✔
931
                                      SStreamTriggerReaderInfo*             sStreamReaderInfo) {
932
  int32_t code = 0;
339✔
933
  int32_t lino = 0;
339✔
934
  SArray* schemas = NULL;
339✔
935

936
  schemas = taosArrayInit(4, sizeof(SSchema));
339✔
937
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
340!
938
  STREAM_CHECK_RET_GOTO(
340!
939
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // last ts
940

941
  BUILD_OPTION(op, sStreamReaderInfo, -1, true, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, schemas, true,
340✔
942
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
943
  schemas = NULL;
340✔
944
  *options = op;
340✔
945

946
end:
340✔
947
  taosArrayDestroy(schemas);
340✔
948
  return code;
339✔
949
}
950

951
static int32_t createOptionsForFirstTs(SStreamTriggerReaderTaskInnerOptions* options,
331✔
952
                                       SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t ver) {
953
  int32_t code = 0;
331✔
954
  int32_t lino = 0;
331✔
955
  SArray* schemas = NULL;
331✔
956

957
  schemas = taosArrayInit(4, sizeof(SSchema));
331✔
958
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
331!
959
  STREAM_CHECK_RET_GOTO(
331!
960
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // first ts
961

962
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, TSDB_ORDER_ASC, start, INT64_MAX, schemas, true,
331✔
963
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
964
  schemas = NULL;
331✔
965

966
  *options = op;
331✔
967
end:
331✔
968
  taosArrayDestroy(schemas);
331✔
969
  return code;
331✔
970
}
971

972
static int32_t createOptionsForTsdbMeta(SStreamTriggerReaderTaskInnerOptions* options,
1,354✔
973
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t end,
974
                                        int64_t gid, int8_t order, int64_t ver, bool onlyTs) {
975
  int32_t code = 0;
1,354✔
976
  int32_t lino = 0;
1,354✔
977
  SArray* schemas = NULL;
1,354✔
978

979
  int32_t index = 1;
1,354✔
980
  schemas = taosArrayInit(8, sizeof(SSchema));
1,354✔
981
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,354!
982
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
1,354!
983
  if (!onlyTs){
1,354✔
984
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
677!
985
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
677!
986
    if (sStreamReaderInfo->uidList == NULL) {
677✔
987
      STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
100!
988
    }
989
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
677!
990
  }
991
  
992
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, start, end, schemas, true, (gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), gid,
1,354✔
993
               true, sStreamReaderInfo->uidList);
994
  schemas = NULL;
1,354✔
995
  *options = op;
1,354✔
996

997
end:
1,354✔
998
  taosArrayDestroy(schemas);
1,354✔
999
  return code;
1,354✔
1000
}
1001

1002
static int taosCompareInt64Asc(const void* elem1, const void* elem2) {
57✔
1003
  int64_t* node1 = (int64_t*)elem1;
57✔
1004
  int64_t* node2 = (int64_t*)elem2;
57✔
1005

1006
  if (*node1 < *node2) {
57!
1007
    return -1;
×
1008
  }
1009

1010
  return *node1 > *node2;
57✔
1011
}
1012

1013
static int32_t getTableList(SArray** pList, int32_t* pNum, int64_t* suid, int32_t index,
89✔
1014
                            SStreamTriggerReaderInfo* sStreamReaderInfo) {
1015
  int32_t code = 0;
89✔
1016
  int32_t lino = 0;
89✔
1017

1018
  int32_t* start = taosArrayGet(sStreamReaderInfo->uidListIndex, index);
89✔
1019
  STREAM_CHECK_NULL_GOTO(start, terrno);
89!
1020
  int32_t  iStart = *start;
89✔
1021
  int32_t* end = taosArrayGet(sStreamReaderInfo->uidListIndex, index + 1);
89✔
1022
  STREAM_CHECK_NULL_GOTO(end, terrno);
89!
1023
  int32_t iEnd = *end;
89✔
1024
  *pList = taosArrayInit(iEnd - iStart, sizeof(STableKeyInfo));
89✔
1025
  STREAM_CHECK_NULL_GOTO(*pList, terrno);
89!
1026
  for (int32_t i = iStart; i < iEnd; ++i) {
269✔
1027
    int64_t*       uid = taosArrayGet(sStreamReaderInfo->uidList, i);
180✔
1028
    STableKeyInfo* info = taosArrayReserve(*pList, 1);
180✔
1029
    STREAM_CHECK_NULL_GOTO(info, terrno);
180!
1030
    *suid = uid[0];
180✔
1031
    info->uid = uid[1];
180✔
1032
  }
1033
  *pNum = iEnd - iStart;
89✔
1034
end:
89✔
1035
  STREAM_PRINT_LOG_END(code, lino);
89!
1036
  return code;
89✔
1037
}
1038

1039
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
582✔
1040
                                  SStreamReaderTaskInner* pTask) {
1041
  int32_t code = 0;
582✔
1042
  int32_t lino = 0;
582✔
1043

1044
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTask->pTableList), sizeof(STsInfo));
582✔
1045
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
582!
1046
  while (true) {
383✔
1047
    bool hasNext = false;
965✔
1048
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
965!
1049
    if (!hasNext) {
962✔
1050
      break;
580✔
1051
    }
1052
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
666✔
1053
    STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
666✔
1054
    STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
665!
1055
    if (pTask->options.order == TSDB_ORDER_ASC) {
665✔
1056
      tsInfo->ts = pTask->pResBlock->info.window.skey;
546✔
1057
    } else {
1058
      tsInfo->ts = pTask->pResBlock->info.window.ekey;
119✔
1059
    }
1060
    tsInfo->gId = qStreamGetGroupId(pTask->pTableList, pTask->pResBlock->info.id.uid);
665✔
1061
    stDebug("vgId:%d %s get last ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
667✔
1062
            tsInfo->gId, tsRsp->ver);
1063

1064
    pTask->currentGroupIndex++;
667✔
1065
    if (pTask->currentGroupIndex >= qStreamGetTableListGroupNum(pTask->pTableList)) {
667✔
1066
      break;
284✔
1067
    }
1068
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTask));
383!
1069
  }
1070

1071
end:
580✔
1072
  return code;
580✔
1073
}
1074

1075
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
89✔
1076
                               SStreamReaderTaskInner* pTask, int64_t ver) {
1077
  int32_t code = 0;
89✔
1078
  int32_t lino = 0;
89✔
1079
  int64_t suid = 0;
89✔
1080
  SArray* pList = NULL;
89✔
1081
  int32_t pNum = 0;
89✔
1082

1083
  tsRsp->tsInfo = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(STsInfo));
89✔
1084
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
89!
1085
  for (int32_t i = 0; i < taosArrayGetSize(sStreamReaderInfo->uidListIndex) - 1; ++i) {
178✔
1086
    STREAM_CHECK_RET_GOTO(getTableList(&pList, &pNum, &suid, i, sStreamReaderInfo));
89!
1087

1088
    cleanupQueryTableDataCond(&pTask->cond);
89✔
1089
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas,
89!
1090
                                                        pTask->options.isSchema, pTask->options.twindows, suid, ver));
1091
    STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderOpen(
89!
1092
        pVnode, &pTask->cond, taosArrayGet(pList, 0), pNum, pTask->pResBlock, (void**)&pTask->pReader, pTask->idStr, NULL));
1093
    taosArrayDestroy(pList);
89✔
1094
    pList = NULL;
89✔
1095
    while (true) {
116✔
1096
      bool hasNext = false;
205✔
1097
      STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
205!
1098
      if (!hasNext) {
205✔
1099
        break;
89✔
1100
      }
1101
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
116✔
1102
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
116✔
1103
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
116!
1104
      if (pTask->options.order == TSDB_ORDER_ASC) {
116✔
1105
        tsInfo->ts = pTask->pResBlock->info.window.skey;
77✔
1106
      } else {
1107
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
39✔
1108
      }
1109
      tsInfo->gId = pTask->pResBlock->info.id.uid;
116✔
1110
      stDebug("vgId:%d %s get vtable last ts:%" PRId64 ", uid:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__,
116✔
1111
              tsInfo->ts, tsInfo->gId, tsRsp->ver);
1112
    }
1113
    pTask->api.tsdReader.tsdReaderClose(pTask->pReader);
89✔
1114
    pTask->pReader = NULL;
89✔
1115
  }
1116

1117
end:
89✔
1118
  taosArrayDestroy(pList);
89✔
1119
  return code;
89✔
1120
}
1121

1122
static void reSetUid(SStreamTriggerReaderTaskInnerOptions* options, int64_t suid, int64_t uid) {
355✔
1123
  if (suid != 0) options->suid = suid;
355✔
1124
  options->uid = uid;
355✔
1125
  if (options->suid != 0) {
355✔
1126
    options->tableType = TD_CHILD_TABLE;
352✔
1127
  } else {
1128
    options->tableType = TD_NORMAL_TABLE;
3✔
1129
  }
1130
}
355✔
1131

1132
static int32_t createOptionsForTsdbData(SVnode* pVnode, SStreamTriggerReaderTaskInnerOptions* options,
319✔
1133
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid, SArray* cols,
1134
                                        int8_t order,int64_t skey, int64_t ekey, int64_t ver) {
1135
  int32_t code = 0;
319✔
1136
  int32_t lino = 0;
319✔
1137
  SArray* schemas = NULL;
319✔
1138

1139
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
319!
1140
  STREAM_CHECK_RET_GOTO(shrinkScheams(cols, schemas));
319!
1141
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, skey, ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
319✔
1142
  *options = op;
319✔
1143

1144
end:
319✔
1145
  return code;
319✔
1146
}
1147

1148
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
66✔
1149
  int32_t code = 0;
66✔
1150
  int32_t lino = 0;
66✔
1151
  void*   buf = NULL;
66✔
1152
  size_t  size = 0;
66✔
1153
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
66!
1154
  void* pTask = sStreamReaderInfo->pTask;
66✔
1155

1156
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
66✔
1157

1158
  TSWAP(sStreamReaderInfo->uidList, req->setTableReq.uids);
66✔
1159
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidList, TSDB_CODE_INVALID_PARA);
66!
1160

1161
  taosArraySort(sStreamReaderInfo->uidList, taosCompareInt64Asc);
66✔
1162
  if (sStreamReaderInfo->uidListIndex != NULL) {
66!
1163
    taosArrayClear(sStreamReaderInfo->uidListIndex);
×
1164
  } else {
1165
    sStreamReaderInfo->uidListIndex = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(int32_t));
66✔
1166
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidListIndex, TSDB_CODE_INVALID_PARA);
66!
1167
  }
1168

1169
  if (sStreamReaderInfo->uidHash != NULL) {
66!
1170
    taosHashClear(sStreamReaderInfo->uidHash);
×
1171
  } else {
1172
    sStreamReaderInfo->uidHash = taosHashInit(taosArrayGetSize(sStreamReaderInfo->uidList),
66✔
1173
                                              taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1174
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHash, TSDB_CODE_INVALID_PARA);
66!
1175
  }
1176

1177
  int64_t suid = 0;
66✔
1178
  int32_t cnt = taosArrayGetSize(sStreamReaderInfo->uidList);
66✔
1179
  for (int32_t i = 0; i < cnt; ++i) {
179✔
1180
    int64_t* data = taosArrayGet(sStreamReaderInfo->uidList, i);
113✔
1181
    STREAM_CHECK_NULL_GOTO(data, terrno);
113!
1182
    if (*data == 0 || *data != suid) {
113✔
1183
      STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &i), terrno);
132!
1184
    }
1185
    suid = *data;
113✔
1186
    stDebug("vgId:%d %s suid:%" PRId64 ",uid:%" PRId64 ", index:%d", TD_VID(pVnode), __func__, suid, data[1], i);
113✔
1187
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->uidHash, data + 1, LONG_BYTES, &i, sizeof(int32_t)));
113!
1188
  }
1189
  STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &cnt), terrno);
132!
1190

1191
end:
66✔
1192
  STREAM_PRINT_LOG_END_WITHID(code, lino);
66!
1193
  SRpcMsg rsp = {
66✔
1194
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1195
  tmsgSendRsp(&rsp);
66✔
1196
  return code;
66✔
1197
}
1198

1199
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
337✔
1200
  int32_t                 code = 0;
337✔
1201
  int32_t                 lino = 0;
337✔
1202
  SArray*                 schemas = NULL;
337✔
1203
  SStreamReaderTaskInner* pTaskInner = NULL;
337✔
1204
  SStreamTsResponse       lastTsRsp = {0};
337✔
1205
  void*                   buf = NULL;
337✔
1206
  size_t                  size = 0;
337✔
1207

1208
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
337!
1209
  void* pTask = sStreamReaderInfo->pTask;
337✔
1210

1211
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
337✔
1212

1213
  SStreamTriggerReaderTaskInnerOptions options = {0};
337✔
1214
  STREAM_CHECK_RET_GOTO(createOptionsForLastTs(&options, sStreamReaderInfo));
337!
1215
  SStorageAPI api = {0};
340✔
1216
  initStorageAPI(&api);
340✔
1217
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
340!
1218

1219
  lastTsRsp.ver = pVnode->state.applied;
340✔
1220
  if (sStreamReaderInfo->uidList != NULL) {
340✔
1221
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner, -1));
61!
1222
  } else {
1223
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
279!
1224
  }
1225
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
338✔
1226
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
339!
1227

1228
end:
340✔
1229
  STREAM_PRINT_LOG_END_WITHID(code, lino);
340!
1230
  SRpcMsg rsp = {
340✔
1231
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1232
  tmsgSendRsp(&rsp);
340✔
1233
  taosArrayDestroy(lastTsRsp.tsInfo);
339✔
1234
  releaseStreamTask(&pTaskInner);
340✔
1235
  return code;
340✔
1236
}
1237

1238
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
331✔
1239
  int32_t                 code = 0;
331✔
1240
  int32_t                 lino = 0;
331✔
1241
  SStreamReaderTaskInner* pTaskInner = NULL;
331✔
1242
  SStreamTsResponse       firstTsRsp = {0};
331✔
1243
  void*                   buf = NULL;
331✔
1244
  size_t                  size = 0;
331✔
1245

1246
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
331!
1247
  void* pTask = sStreamReaderInfo->pTask;
331✔
1248
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
331✔
1249
  SStreamTriggerReaderTaskInnerOptions options = {0};
331✔
1250
  STREAM_CHECK_RET_GOTO(createOptionsForFirstTs(&options, sStreamReaderInfo, req->firstTsReq.startTime, req->firstTsReq.ver));
331!
1251
  SStorageAPI api = {0};
331✔
1252
  initStorageAPI(&api);
331✔
1253
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
331!
1254
  
1255
  firstTsRsp.ver = pVnode->state.applied;
331✔
1256
  if (sStreamReaderInfo->uidList != NULL) {
331✔
1257
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.ver));
28!
1258
  } else {
1259
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
303!
1260
  }
1261

1262
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
331✔
1263
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
331!
1264

1265
end:
331✔
1266
  STREAM_PRINT_LOG_END_WITHID(code, lino);
331!
1267
  SRpcMsg rsp = {
331✔
1268
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1269
  tmsgSendRsp(&rsp);
331✔
1270
  taosArrayDestroy(firstTsRsp.tsInfo);
331✔
1271
  releaseStreamTask(&pTaskInner);
331✔
1272
  return code;
331✔
1273
}
1274

1275
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
677✔
1276
  int32_t code = 0;
677✔
1277
  int32_t lino = 0;
677✔
1278
  void*   buf = NULL;
677✔
1279
  size_t  size = 0;
677✔
1280
  SStreamTriggerReaderTaskInnerOptions options = {0};
677✔
1281

1282
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
677!
1283
  void* pTask = sStreamReaderInfo->pTask;
677✔
1284
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
677✔
1285

1286
  SStreamReaderTaskInner* pTaskInner = NULL;
677✔
1287
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
677✔
1288

1289
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
677!
1290
    SStreamTriggerReaderTaskInnerOptions optionsTs = {0};
677✔
1291

1292
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&optionsTs, sStreamReaderInfo, req->tsdbMetaReq.startTime,
677!
1293
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, true));
1294
    SStorageAPI api = {0};
677✔
1295
    initStorageAPI(&api);
677✔
1296
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &optionsTs, &pTaskInner, NULL, sStreamReaderInfo->groupIdMap, &api));
677!
1297
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
677!
1298
    
1299
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&options, sStreamReaderInfo, req->tsdbMetaReq.startTime,
677!
1300
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, false));
1301
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(options.schemas, &pTaskInner->pResBlockDst));
677!
1302
  } else {
1303
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1304
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1305
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1306
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1307
  }
1308

1309
  pTaskInner->pResBlockDst->info.rows = 0;
677✔
1310
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
677!
1311
  bool hasNext = true;
677✔
1312
  while (true) {
162✔
1313
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
839!
1314
    if (!hasNext) {
840✔
1315
      break;
677✔
1316
    }
1317
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
163✔
1318
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
163✔
1319

1320
    int32_t index = 0;
163✔
1321
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
163!
1322
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
163!
1323
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
163!
1324
    if (sStreamReaderInfo->uidList == NULL) {
162✔
1325
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
30!
1326
    }
1327
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
162!
1328

1329
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
162✔
1330
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1331
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1332
            pTaskInner->pResBlockDst->info.rows++;
162✔
1333
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
162!
1334
      break;
×
1335
    }
1336
  }
1337

1338
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
677✔
1339
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
677!
1340
  if (!hasNext) {
676!
1341
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
676!
1342
  }
1343

1344
end:
677✔
1345
  STREAM_PRINT_LOG_END_WITHID(code, lino);
677!
1346
  SRpcMsg rsp = {
677✔
1347
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1348
  tmsgSendRsp(&rsp);
677✔
1349
  taosArrayDestroy(options.schemas);
677✔
1350
  return code;
677✔
1351
}
1352

1353
static int32_t vnodeProcessStreamTsdbTsDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
36✔
1354
  int32_t                 code = 0;
36✔
1355
  int32_t                 lino = 0;
36✔
1356
  SStreamReaderTaskInner* pTaskInner = NULL;
36✔
1357
  void*                   buf = NULL;
36✔
1358
  size_t                  size = 0;
36✔
1359
  SSDataBlock*            pBlockRes = NULL;
36✔
1360

1361
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
36!
1362
  void* pTask = sStreamReaderInfo->pTask;
36✔
1363
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
36!
1364

1365
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
36✔
1366
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
1367
  reSetUid(&options, req->tsdbTsDataReq.suid, req->tsdbTsDataReq.uid);
36✔
1368
  SStorageAPI api = {0};
36✔
1369
  initStorageAPI(&api);
36✔
1370
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
36!
1371
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
36!
1372
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
36!
1373

1374
  while (1) {
36✔
1375
    bool hasNext = false;
72✔
1376
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
72!
1377
    if (!hasNext) {
72✔
1378
      break;
36✔
1379
    }
1380
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
36✔
1381

1382
    SSDataBlock* pBlock = NULL;
36✔
1383
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
36!
1384
    if (pBlock != NULL && pBlock->info.rows > 0) {
36!
1385
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &api, pBlock));
36!
1386
    }
1387
    
1388
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
36!
1389
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
36!
1390
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
36!
1391
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1392
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1393
  }
1394

1395
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
36✔
1396

1397
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
36!
1398
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
36!
1399

1400
end:
36✔
1401
  STREAM_PRINT_LOG_END_WITHID(code, lino);
36!
1402
  SRpcMsg rsp = {
36✔
1403
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1404
  tmsgSendRsp(&rsp);
36✔
1405
  blockDataDestroy(pBlockRes);
36✔
1406

1407
  releaseStreamTask(&pTaskInner);
36✔
1408
  return code;
36✔
1409
}
1410

1411
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
341✔
1412
  int32_t code = 0;
341✔
1413
  int32_t lino = 0;
341✔
1414
  void*   buf = NULL;
341✔
1415
  size_t  size = 0;
341✔
1416

1417
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
341!
1418
  SStreamReaderTaskInner* pTaskInner = NULL;
341✔
1419
  void* pTask = sStreamReaderInfo->pTask;
341✔
1420
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
341✔
1421
  
1422
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
341✔
1423

1424
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
341✔
1425
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, true, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
165✔
1426
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
1427
                 req->tsdbTriggerDataReq.gid, true, NULL);
1428
    SStorageAPI api = {0};
165✔
1429
    initStorageAPI(&api);
165✔
1430
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock,
165!
1431
                                           sStreamReaderInfo->groupIdMap, &api));
1432

1433
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
165!
1434

1435
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
165!
1436

1437
  } else {
1438
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
176✔
1439
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
176!
1440
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
176✔
1441
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
176!
1442
  }
1443

1444
  pTaskInner->pResBlockDst->info.rows = 0;
341✔
1445
  bool hasNext = true;
341✔
1446
  while (1) {
×
1447
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
341!
1448
    if (!hasNext) {
341✔
1449
      break;
165✔
1450
    }
1451
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
176✔
1452
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
176✔
1453

1454
    SSDataBlock* pBlock = NULL;
176✔
1455
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
176!
1456
    if (pBlock != NULL && pBlock->info.rows > 0) {
176!
1457
      STREAM_CHECK_RET_GOTO(
176!
1458
        processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &pTaskInner->api, pBlock));
1459
    }
1460
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
176!
1461
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
176!
1462
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
176✔
1463
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1464
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1465
    if (pTaskInner->pResBlockDst->info.rows > 0) {
176!
1466
      break;
176✔
1467
    }
1468
  }
1469

1470
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
341!
1471
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
341✔
1472
  if (!hasNext) {
341✔
1473
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
165!
1474
  }
1475

1476
end:
341✔
1477
  STREAM_PRINT_LOG_END_WITHID(code, lino);
341!
1478
  SRpcMsg rsp = {
341✔
1479
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1480
  tmsgSendRsp(&rsp);
341✔
1481

1482
  return code;
341✔
1483
}
1484

1485
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
163,420✔
1486
  int32_t code = 0;
163,420✔
1487
  int32_t lino = 0;
163,420✔
1488
  void*   buf = NULL;
163,420✔
1489
  size_t  size = 0;
163,420✔
1490
  SSDataBlock*            pBlockRes = NULL;
163,420✔
1491

1492
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
163,420!
1493
  void* pTask = sStreamReaderInfo->pTask;
163,420✔
1494
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
163,420✔
1495

1496
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
163,420!
1497

1498
  SStreamReaderTaskInner* pTaskInner = NULL;
163,420✔
1499
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
163,420✔
1500

1501
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
163,420!
1502
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
163,420✔
1503
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
1504
    SStorageAPI api = {0};
163,420✔
1505
    initStorageAPI(&api);
163,420✔
1506
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
163,419✔
1507

1508
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
163,310!
1509
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
163,310!
1510
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
163,308!
1511
  } else {
1512
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1513
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1514
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1515
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1516
  }
1517

1518
  pTaskInner->pResBlockDst->info.rows = 0;
163,309✔
1519
  bool hasNext = true;
163,309✔
1520
  while (1) {
2,572✔
1521
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
165,881!
1522
    if (!hasNext) {
165,869✔
1523
      break;
163,300✔
1524
    }
1525
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
2,569✔
1526

1527
    SSDataBlock* pBlock = NULL;
2,572✔
1528
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
2,572!
1529
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
2,572!
1530
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
2,571!
1531
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
2,572!
1532
      break;
×
1533
    }
1534
  }
1535
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
163,300✔
1536
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
163,298!
1537
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
163,291✔
1538
  if (!hasNext) {
163,309!
1539
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
163,309!
1540
  }
1541

1542
end:
163,305✔
1543
  STREAM_PRINT_LOG_END_WITHID(code, lino);
163,415!
1544
  SRpcMsg rsp = {
163,419✔
1545
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1546
  tmsgSendRsp(&rsp);
163,419✔
1547
  blockDataDestroy(pBlockRes);
163,418✔
1548
  return code;
163,414✔
1549
}
1550

1551
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
319✔
1552
  int32_t code = 0;
319✔
1553
  int32_t lino = 0;
319✔
1554
  void*   buf = NULL;
319✔
1555
  size_t  size = 0;
319✔
1556

1557
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
319!
1558
  void* pTask = sStreamReaderInfo->pTask;
319✔
1559
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
319✔
1560

1561
  SStreamReaderTaskInner* pTaskInner = NULL;
319✔
1562
  int64_t key = req->tsdbDataReq.uid;
319✔
1563

1564
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
319!
1565
    SStreamTriggerReaderTaskInnerOptions options = {0};
319✔
1566

1567
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbData(pVnode, &options, sStreamReaderInfo, req->tsdbDataReq.uid,
319!
1568
                                                   req->tsdbDataReq.cids, req->tsdbDataReq.order, req->tsdbDataReq.skey,
1569
                                                   req->tsdbDataReq.ekey, req->tsdbDataReq.ver));
1570
    reSetUid(&options, req->tsdbDataReq.suid, req->tsdbDataReq.uid);
319✔
1571

1572
    SStorageAPI api = {0};
319✔
1573
    initStorageAPI(&api);
319✔
1574
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
319!
1575

1576
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
319✔
1577
    cleanupQueryTableDataCond(&pTaskInner->cond);
319✔
1578
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
319!
1579
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
1580
                                                        pTaskInner->options.suid, pTaskInner->options.ver));
1581
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
319!
1582
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
1583

1584
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
319!
1585
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
319!
1586
  } else {
1587
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1588
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1589
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1590
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1591
  }
1592

1593
  pTaskInner->pResBlockDst->info.rows = 0;
319✔
1594
  bool hasNext = true;
319✔
1595
  while (1) {
319✔
1596
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
638!
1597
    if (!hasNext) {
638✔
1598
      break;
319✔
1599
    }
1600

1601
    SSDataBlock* pBlock = NULL;
319✔
1602
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
319!
1603
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
319!
1604
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
319!
1605
      break;
×
1606
    }
1607
  }
1608
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
319!
1609
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
319✔
1610
  if (!hasNext) {
319!
1611
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
319!
1612
  }
1613

1614
end:
319✔
1615
  STREAM_PRINT_LOG_END_WITHID(code, lino);
319!
1616
  SRpcMsg rsp = {
319✔
1617
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1618
  tmsgSendRsp(&rsp);
319✔
1619

1620
  return code;
319✔
1621
}
1622

1623
static int32_t vnodeProcessStreamWalMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
8,036✔
1624
  int32_t      code = 0;
8,036✔
1625
  int32_t      lino = 0;
8,036✔
1626
  void*        buf = NULL;
8,036✔
1627
  size_t       size = 0;
8,036✔
1628
  SSDataBlock* pBlock = NULL;
8,036✔
1629
  void*        pTableList = NULL;
8,036✔
1630
  SNodeList*   groupNew = NULL;
8,036✔
1631
  int64_t      lastVer = 0;
8,036✔
1632

1633
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
8,036✔
1634
  void* pTask = sStreamReaderInfo->pTask;
8,030✔
1635
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64 ",ctime:%" PRId64, TD_VID(pVnode), __func__,
8,030✔
1636
  req->walMetaReq.lastVer, req->walMetaReq.ctime);
1637

1638
  bool isVTable = sStreamReaderInfo->uidList != NULL;
8,030✔
1639
  if (sStreamReaderInfo->uidList == NULL) {
8,030✔
1640
    SStorageAPI api = {0};
6,911✔
1641
    initStorageAPI(&api);
6,911✔
1642
    STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
6,920!
1643
    STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
6,915!
1644
        pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1645
        groupNew, false, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1646
        &pTableList, sStreamReaderInfo->groupIdMap));
1647
  }
1648

1649
  STREAM_CHECK_RET_GOTO(createBlockForWalMeta(&pBlock, isVTable));
8,039!
1650
  STREAM_CHECK_RET_GOTO(scanWal(pVnode, isVTable ? sStreamReaderInfo->uidHash : pTableList, isVTable, pBlock,
8,058!
1651
                                req->walMetaReq.lastVer, sStreamReaderInfo->deleteReCalc,
1652
                                sStreamReaderInfo->deleteOutTbl, req->walMetaReq.ctime, &lastVer));
1653

1654
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
8,060✔
1655
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
8,060!
1656
  printDataBlock(pBlock, __func__, "");
8,056✔
1657

1658
end:
8,058✔
1659
  if (pBlock != NULL && pBlock->info.rows == 0) {
8,058!
1660
    code = TSDB_CODE_STREAM_NO_DATA;
7,676✔
1661
    buf = rpcMallocCont(sizeof(int64_t));
7,676✔
1662
    *(int64_t *)buf = lastVer;
7,673✔
1663
    size = sizeof(int64_t);
7,673✔
1664
  }
1665
  SRpcMsg rsp = {
8,055✔
1666
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1667
  tmsgSendRsp(&rsp);
8,055✔
1668
  if (code == TSDB_CODE_STREAM_NO_DATA){
8,068✔
1669
    code = 0;
7,679✔
1670
  }
1671
  STREAM_PRINT_LOG_END_WITHID(code, lino);
8,068!
1672
  nodesDestroyList(groupNew);
8,068✔
1673
  blockDataDestroy(pBlock);
8,061✔
1674
  qStreamDestroyTableList(pTableList);
8,068✔
1675

1676
  return code;
8,062✔
1677
}
1678

1679
static int32_t vnodeProcessStreamWalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
7,528✔
1680
  int32_t      code = 0;
7,528✔
1681
  int32_t      lino = 0;
7,528✔
1682
  void*        buf = NULL;
7,528✔
1683
  size_t       size = 0;
7,528✔
1684
  SSDataBlock* pBlock = NULL;
7,528✔
1685

1686
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
7,528!
1687
  void* pTask = sStreamReaderInfo->pTask;
7,528✔
1688
  ST_TASK_DLOG("vgId:%d %s start, request type:%d skey:%" PRId64 ",ekey:%" PRId64 ",uid:%" PRId64 ",ver:%" PRId64, TD_VID(pVnode),
7,528✔
1689
  __func__, req->walDataReq.base.type, req->walDataReq.skey, req->walDataReq.ekey, req->walDataReq.uid, req->walDataReq.ver);
1690

1691
  STimeWindow window = {.skey = req->walDataReq.skey, .ekey = req->walDataReq.ekey};
7,528✔
1692
  if (req->walDataReq.base.type == STRIGGER_PULL_WAL_DATA){
7,528✔
1693
    STREAM_CHECK_RET_GOTO(processWalVerDataVTable(pVnode, req->walDataReq.cids, req->walDataReq.ver,
2,230!
1694
      req->walDataReq.uid, &window, &pBlock));
1695
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_CALC_DATA){
5,298✔
1696
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
2,745!
1697
      req->walDataReq.uid, &window, sStreamReaderInfo->calcResBlock, &pBlock));
1698
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TRIGGER_DATA) {
2,553✔
1699
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, true,
2,350!
1700
      req->walDataReq.uid, &window, sStreamReaderInfo->triggerResBlock, &pBlock));
1701
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TS_DATA){
203!
1702
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
203!
1703
      req->walDataReq.uid, &window, sStreamReaderInfo->tsBlock, &pBlock));
1704

1705
  }
1706
  
1707
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
7,526✔
1708
  printDataBlock(pBlock, __func__, "");
7,526✔
1709
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
7,524!
1710
end:
7,517✔
1711
  STREAM_PRINT_LOG_END_WITHID(code, lino);
7,517!
1712
  SRpcMsg rsp = {
7,517✔
1713
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1714
  tmsgSendRsp(&rsp);
7,517✔
1715

1716
  blockDataDestroy(pBlock);
7,529✔
1717
  return code;
7,528✔
1718
}
1719

1720
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
594✔
1721
  int32_t code = 0;
594✔
1722
  int32_t lino = 0;
594✔
1723
  void*   buf = NULL;
594✔
1724
  size_t  size = 0;
594✔
1725

1726
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
594!
1727
  void* pTask = sStreamReaderInfo->pTask;
594✔
1728
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
594✔
1729

1730
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
594✔
1731
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
595!
1732
  SStreamGroupInfo pGroupInfo = {0};
595✔
1733
  pGroupInfo.gInfo = *gInfo;
595✔
1734

1735
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
595✔
1736
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1737
  buf = rpcMallocCont(size);
595✔
1738
  STREAM_CHECK_NULL_GOTO(buf, terrno);
595!
1739
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
595✔
1740
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1741
end:
595✔
1742
  if (code != 0) {
595!
1743
    rpcFreeCont(buf);
×
1744
    buf = NULL;
×
1745
    size = 0;
×
1746
  }
1747
  STREAM_PRINT_LOG_END_WITHID(code, lino);
595!
1748
  SRpcMsg rsp = {
595✔
1749
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1750
  tmsgSendRsp(&rsp);
595✔
1751

1752
  return code;
595✔
1753
}
1754

1755
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
178✔
1756
  int32_t              code = 0;
178✔
1757
  int32_t              lino = 0;
178✔
1758
  void*                buf = NULL;
178✔
1759
  size_t               size = 0;
178✔
1760
  SStreamMsgVTableInfo vTableInfo = {0};
178✔
1761
  SMetaReader          metaReader = {0};
178✔
1762
  SNodeList*           groupNew = NULL;
178✔
1763
  void*                pTableList = NULL;
178✔
1764
  SStorageAPI api = {0};
178✔
1765
  initStorageAPI(&api);
178✔
1766

1767
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
178!
1768
  void* pTask = sStreamReaderInfo->pTask;
178✔
1769
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
178✔
1770

1771
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
178!
1772
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
178!
1773
      pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1774
      groupNew, true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1775
      &pTableList, sStreamReaderInfo->groupIdMap));
1776

1777
  SArray* cids = req->virTableInfoReq.cids;
178✔
1778
  STREAM_CHECK_NULL_GOTO(cids, terrno);
178!
1779

1780
  SArray* pTableListArray = qStreamGetTableArrayList(pTableList);
178✔
1781
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
178!
1782

1783
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
178✔
1784
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
178!
1785
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
178✔
1786

1787
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
471✔
1788
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
294✔
1789
    if (pKeyInfo == NULL) {
294!
1790
      continue;
×
1791
    }
1792
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
294✔
1793
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
294!
1794
    vTable->uid = pKeyInfo->uid;
294✔
1795
    vTable->gId = pKeyInfo->groupId;
294✔
1796

1797
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
294✔
1798
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
293!
1799
      vTable->cols.nCols = metaReader.me.colRef.nCols;
×
1800
      vTable->cols.version = metaReader.me.colRef.version;
×
1801
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
×
1802
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
×
1803
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
×
1804
      }
1805
    } else {
1806
      vTable->cols.nCols = taosArrayGetSize(cids);
293✔
1807
      vTable->cols.version = metaReader.me.colRef.version;
293✔
1808
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
293!
1809
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
1,217✔
1810
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
3,142✔
1811
          if (metaReader.me.colRef.pColRef[j].hasRef &&
2,784✔
1812
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
1,689✔
1813
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
566✔
1814
            break;
566✔
1815
          }
1816
        }
1817
      }
1818
    }
1819
    tDecoderClear(&metaReader.coder);
291✔
1820
  }
1821
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
177✔
1822
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
177!
1823

1824
end:
178✔
1825
  nodesDestroyList(groupNew);
178✔
1826
  qStreamDestroyTableList(pTableList);
178✔
1827
  tDestroySStreamMsgVTableInfo(&vTableInfo);
178✔
1828
  api.metaReaderFn.clearReader(&metaReader);
178✔
1829
  STREAM_PRINT_LOG_END_WITHID(code, lino);
178!
1830
  SRpcMsg rsp = {
178✔
1831
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1832
  tmsgSendRsp(&rsp);
178✔
1833
  return code;
178✔
1834
}
1835

1836
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
67✔
1837
  int32_t                   code = 0;
67✔
1838
  int32_t                   lino = 0;
67✔
1839
  void*                     buf = NULL;
67✔
1840
  size_t                    size = 0;
67✔
1841
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
67✔
1842
  SMetaReader               metaReader = {0};
67✔
1843
  int64_t streamId = req->base.streamId;
67✔
1844
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
67✔
1845

1846
  SStorageAPI api = {0};
67✔
1847
  initStorageAPI(&api);
67✔
1848

1849
  SArray* cols = req->origTableInfoReq.cols;
67✔
1850
  STREAM_CHECK_NULL_GOTO(cols, terrno);
67!
1851

1852
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
67✔
1853

1854
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
67!
1855

1856
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
67✔
1857
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
276✔
1858
    OTableInfo*    oInfo = taosArrayGet(cols, i);
210✔
1859
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
210✔
1860
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
210!
1861
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
210!
1862
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
210✔
1863
    vTableInfo->uid = metaReader.me.uid;
209✔
1864
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
209✔
1865

1866
    SSchemaWrapper* sSchemaWrapper = NULL;
209✔
1867
    if (metaReader.me.type == TD_CHILD_TABLE) {
209✔
1868
      int64_t suid = metaReader.me.ctbEntry.suid;
192✔
1869
      vTableInfo->suid = suid;
192✔
1870
      tDecoderClear(&metaReader.coder);
192✔
1871
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
192!
1872
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
191✔
1873
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
17!
1874
      vTableInfo->suid = 0;
17✔
1875
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
17✔
1876
    } else {
1877
      stError("invalid table type:%d", metaReader.me.type);
×
1878
    }
1879

1880
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
583!
1881
      SSchema* s = sSchemaWrapper->pSchema + j;
584✔
1882
      if (strcmp(s->name, oInfo->refColName) == 0) {
584✔
1883
        vTableInfo->cid = s->colId;
209✔
1884
        break;
209✔
1885
      }
1886
    }
1887
    if (vTableInfo->cid == 0) {
208!
1888
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
1889
              oInfo->refTableName);
1890
    }
1891
    tDecoderClear(&metaReader.coder);
208✔
1892
  }
1893

1894
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
66!
1895

1896
end:
66✔
1897
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
67✔
1898
  api.metaReaderFn.clearReader(&metaReader);
67✔
1899
  STREAM_PRINT_LOG_END(code, lino);
67!
1900
  SRpcMsg rsp = {
67✔
1901
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1902
  tmsgSendRsp(&rsp);
67✔
1903
  return code;
67✔
1904
}
1905

1906
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
647✔
1907
  int32_t                   code = 0;
647✔
1908
  int32_t                   lino = 0;
647✔
1909
  void*                     buf = NULL;
647✔
1910
  size_t                    size = 0;
647✔
1911
  SSDataBlock* pBlock = NULL;
647✔
1912

1913
  SMetaReader               metaReader = {0};
647✔
1914
  SMetaReader               metaReaderStable = {0};
647✔
1915
  int64_t streamId = req->base.streamId;
647✔
1916
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
647✔
1917

1918
  SStorageAPI api = {0};
647✔
1919
  initStorageAPI(&api);
647✔
1920

1921
  SArray* cols = req->virTablePseudoColReq.cids;
648✔
1922
  STREAM_CHECK_NULL_GOTO(cols, terrno);
648!
1923

1924
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
648✔
1925
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
648!
1926

1927
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
647!
1928

1929
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
647!
1930
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
648✔
1931
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 && *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
17!
1932
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
17✔
1933
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
17!
1934
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
17!
1935
    pBlock->info.rows = 1;
17✔
1936
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
17✔
1937
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
17!
1938
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
17!
1939
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
631!
1940
    int64_t suid = metaReader.me.ctbEntry.suid;
631✔
1941
    api.metaReaderFn.readerReleaseLock(&metaReader);
631✔
1942
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
630✔
1943

1944
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
631!
1945
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
630✔
1946
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
2,119✔
1947
      col_id_t* id = taosArrayGet(cols, i);
1,488✔
1948
      STREAM_CHECK_NULL_GOTO(id, terrno);
1,488!
1949
      if (*id == -1) {
1,488✔
1950
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
630✔
1951
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
630!
1952
        continue;
631✔
1953
      }
1954
      size_t j = 0;
858✔
1955
      for (; j < sSchemaWrapper->nCols; j++) {
1,470!
1956
        SSchema* s = sSchemaWrapper->pSchema + j;
1,470✔
1957
        if (s->colId == *id) {
1,470✔
1958
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
858✔
1959
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
858!
1960
          break;
858✔
1961
        }
1962
      }
1963
      if (j == sSchemaWrapper->nCols) {
858!
1964
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
1965
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
1966
      }
1967
    }
1968
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
630!
1969
    pBlock->info.rows = 1;
631✔
1970
    
1971
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
2,121✔
1972
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,489✔
1973
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
1,489!
1974

1975
      if (pDst->info.colId == -1) {
1,489✔
1976
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
631!
1977
        continue;
631✔
1978
      }
1979
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
858!
1980
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
1981
        continue;
×
1982
      }
1983

1984
      STagVal val = {0};
858✔
1985
      val.cid = pDst->info.colId;
858✔
1986
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
858✔
1987

1988
      char* data = NULL;
858✔
1989
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
858!
1990
        data = tTagValToData((const STagVal*)p, false);
822✔
1991
      } else {
1992
        data = (char*)p;
36✔
1993
      }
1994

1995
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
859!
1996
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
1997

1998
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
859!
1999
          (data != NULL)) {
2000
        taosMemoryFree(data);
108!
2001
      }
2002
    }
2003
  } else {
2004
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
2005
    code = TSDB_CODE_INVALID_PARA;
×
2006
    goto end;
×
2007
  }
2008
  
2009
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
647✔
2010
  printDataBlock(pBlock, __func__, "");
647✔
2011
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
647!
2012

2013
end:
647✔
2014
  api.metaReaderFn.clearReader(&metaReaderStable);
647✔
2015
  api.metaReaderFn.clearReader(&metaReader);
648✔
2016
  STREAM_PRINT_LOG_END(code, lino);
648!
2017
  SRpcMsg rsp = {
648✔
2018
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2019
  tmsgSendRsp(&rsp);
648✔
2020
  blockDataDestroy(pBlock);
648✔
2021
  return code;
648✔
2022
}
2023

2024
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
14,994✔
2025
  int32_t            code = 0;
14,994✔
2026
  int32_t            lino = 0;
14,994✔
2027
  void*              buf = NULL;
14,994✔
2028
  size_t             size = 0;
14,994✔
2029
  void*              taskAddr = NULL;
14,994✔
2030
  SArray*            pResList = NULL;
14,994✔
2031

2032
  SResFetchReq req = {0};
14,994✔
2033
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
14,994!
2034
                              TSDB_CODE_QRY_INVALID_INPUT);
2035
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
14,994✔
2036
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
14,994!
2037

2038
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
14,994!
2039
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
14,994✔
2040
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
14,994!
2041
  void* pTask = sStreamReaderCalcInfo->pTask;
14,994✔
2042
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
14,994✔
2043
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
2044

2045
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
14,994!
2046
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
14,940✔
2047
    int64_t uid = 0;
14,940✔
2048
    if (req.dynTbname) {
14,940✔
2049
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
6✔
2050
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
6!
2051
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
6✔
2052
        if (pValue != NULL && pValue->isTbname) {
6!
2053
          uid = pValue->uid;
6✔
2054
          break;
6✔
2055
        }
2056
      }
2057
    }
2058
    
2059
    SReadHandle handle = {0};
14,940✔
2060
    handle.vnode = pVnode;
14,940✔
2061
    handle.uid = uid;
14,940✔
2062

2063
    initStorageAPI(&handle.api);
14,940✔
2064
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
14,940✔
2065
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
14,505✔
2066
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
14,880✔
2067
      if (node != NULL) {
14,880✔
2068
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle));
14,866!
2069
      }
2070
    }
2071

2072
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
14,940✔
2073
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
14,940✔
2074

2075
    // if (sStreamReaderCalcInfo->pTaskInfo == NULL) {
2076
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
14,940✔
2077
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
2078
                                                    req.taskId));
2079
    // } else {
2080
    // STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, handle.winRange));
2081
    // }
2082

2083
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
14,934!
2084
  }
2085

2086
  if (req.pOpParam != NULL) {
14,988✔
2087
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
48✔
2088
  }
2089
  
2090
  pResList = taosArrayInit(4, POINTER_BYTES);
14,988✔
2091
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
14,988!
2092
  uint64_t ts = 0;
14,988✔
2093
  bool     hasNext = false;
14,988✔
2094
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
14,988✔
2095

2096
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
24,334✔
2097
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
9,360✔
2098
    if (pBlock == NULL) continue;
9,360!
2099
    printDataBlock(pBlock, __func__, "fetch");
9,360✔
2100
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
9,360✔
2101
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo));
330!
2102
      printDataBlock(pBlock, __func__, "fetch filter");
330✔
2103
    }
2104
  }
2105

2106
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
14,974!
2107
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
14,974✔
2108

2109
end:
148✔
2110
  taosArrayDestroy(pResList);
14,994✔
2111
  streamReleaseTask(taskAddr);
14,994✔
2112

2113
  STREAM_PRINT_LOG_END(code, lino);
14,994!
2114
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
14,994✔
2115
  tmsgSendRsp(&rsp);
14,994✔
2116
  tDestroySResFetchReq(&req);
14,994✔
2117
  return code;
14,994✔
2118
}
2119

2120
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
197,669✔
2121
  int32_t                   code = 0;
197,669✔
2122
  int32_t                   lino = 0;
197,669✔
2123
  SSTriggerPullRequestUnion req = {0};
197,669✔
2124
  void*                     taskAddr = NULL;
197,669✔
2125

2126
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
197,669✔
2127
  if (!syncIsReadyForRead(pVnode->sync)) {
197,681✔
2128
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
88✔
2129
    return 0;
94✔
2130
  }
2131

2132
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
197,605✔
2133
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
14,994✔
2134
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
182,611✔
2135
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
182,604✔
2136
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
182,604✔
2137
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
182,604!
2138
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
182,594✔
2139
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
2140
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
182,601✔
2141
    switch (req.base.type) {
182,585!
2142
      case STRIGGER_PULL_SET_TABLE:
66✔
2143
        code = vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo);
66✔
2144
        break;
66✔
2145
      case STRIGGER_PULL_LAST_TS:
339✔
2146
        code = vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
339✔
2147
        break;
337✔
2148
      case STRIGGER_PULL_FIRST_TS:
331✔
2149
        code = vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
331✔
2150
        break;
331✔
2151
      case STRIGGER_PULL_TSDB_META:
677✔
2152
      case STRIGGER_PULL_TSDB_META_NEXT:
2153
        code = vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
677✔
2154
        break;
677✔
2155
      case STRIGGER_PULL_TSDB_TS_DATA:
36✔
2156
        code = vnodeProcessStreamTsdbTsDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
36✔
2157
        break;
36✔
2158
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
341✔
2159
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
2160
        code = vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
341✔
2161
        break;
341✔
2162
      case STRIGGER_PULL_TSDB_CALC_DATA:
163,420✔
2163
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
2164
        code = vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
163,420✔
2165
        break;
163,408✔
2166
      case STRIGGER_PULL_TSDB_DATA:
319✔
2167
      case STRIGGER_PULL_TSDB_DATA_NEXT:
2168
        code = vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
319✔
2169
        break;
319✔
2170
      case STRIGGER_PULL_WAL_META:
8,043✔
2171
        code = vnodeProcessStreamWalMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
8,043✔
2172
        break;
8,058✔
2173
      case STRIGGER_PULL_WAL_TS_DATA:
7,527✔
2174
      case STRIGGER_PULL_WAL_TRIGGER_DATA:
2175
      case STRIGGER_PULL_WAL_CALC_DATA:
2176
      case STRIGGER_PULL_WAL_DATA:
2177
        code = vnodeProcessStreamWalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
7,527✔
2178
        break;
7,527✔
2179
      case STRIGGER_PULL_GROUP_COL_VALUE:
594✔
2180
        code = vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo);
594✔
2181
        break;
595✔
2182
      case STRIGGER_PULL_VTABLE_INFO:
178✔
2183
        code = vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo);
178✔
2184
        break;
178✔
2185
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
647✔
2186
        code = vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req);
647✔
2187
        break;
648✔
2188
      case STRIGGER_PULL_OTABLE_INFO:
67✔
2189
        code = vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req);
67✔
2190
        break;
67✔
2191
      default:
×
2192
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
2193
        code = TSDB_CODE_APP_ERROR;
×
2194
        break;
×
2195
    }
2196
  } else {
2197
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
7!
2198
    code = TSDB_CODE_APP_ERROR;
×
2199
  }
2200
end:
182,588✔
2201

2202
  streamReleaseTask(taskAddr);
182,588✔
2203

2204
  tDestroySTriggerPullRequest(&req);
182,600✔
2205
  STREAM_PRINT_LOG_END(code, lino);
182,593!
2206
  return code;
182,607✔
2207
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc