• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4747

21 Sep 2025 11:53PM UTC coverage: 58.002% (-1.1%) from 59.065%
#4747

push

travis-ci

web-flow
fix: refine python taos error log matching in checkAsan.sh (#33029)

* fix: refine python taos error log matching in checkAsan.sh

* fix: improve python taos error log matching in checkAsan.sh

133398 of 293157 branches covered (45.5%)

Branch coverage included in aggregate %.

201778 of 284713 relevant lines covered (70.87%)

5539418.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.74
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "scalar.h"
17
#include "streamReader.h"
18
#include "tdatablock.h"
19
#include "tdb.h"
20
#include "tencode.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23
#include "vnd.h"
24
#include "vnode.h"
25
#include "vnodeInt.h"
26

27
#define BUILD_OPTION(options, sStreamInfo, _ver, _groupSort, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
28
                     _gid, _initReader, _uidList)                                                                        \
29
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_uidList == NULL ? sStreamInfo->suid : 0),              \
30
                                                  .uid = sStreamInfo->uid,                                         \
31
                                                  .ver = _ver,                                                     \
32
                                                  .tableType = sStreamInfo->tableType,                             \
33
                                                  .groupSort = _groupSort,                                         \
34
                                                  .order = _order,                                                 \
35
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
36
                                                  .pTagCond = sStreamInfo->pTagCond,                               \
37
                                                  .pTagIndexCond = sStreamInfo->pTagIndexCond,                     \
38
                                                  .pConditions = sStreamInfo->pConditions,                         \
39
                                                  .partitionCols = sStreamInfo->partitionCols,                     \
40
                                                  .schemas = _schemas,                                             \
41
                                                  .isSchema = _isSchema,                                           \
42
                                                  .scanMode = _scanMode,                                           \
43
                                                  .gid = _gid,                                                     \
44
                                                  .initReader = _initReader,                                       \
45
                                                  .uidList = _uidList};
46

47
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
29,791✔
48

49
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
50

51
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
92,074✔
52
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
92,074✔
53
  if (pSrc == NULL) {
92,060✔
54
    return terrno;
11✔
55
  }
56

57
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
92,049✔
58
  return 0;
92,049✔
59
}
60

61
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
35,196✔
62
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
35,196✔
63
  if (code != TSDB_CODE_SUCCESS) {
35,193✔
64
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
6✔
65
  }
66

67
  return code;
35,193✔
68
}
69

70
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
3,381✔
71
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
3,381✔
72
}
73

74
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
139✔
75
  int32_t code = 0;
139✔
76
  int32_t lino = 0;
139✔
77
  void*   buf = NULL;
139✔
78
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
139✔
79
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
140!
80
  buf = rpcMallocCont(len);
140✔
81
  STREAM_CHECK_NULL_GOTO(buf, terrno);
139!
82
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
139✔
83
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
140!
84
  *data = buf;
140✔
85
  *size = len;
140✔
86
  buf = NULL;
140✔
87
end:
140✔
88
  rpcFreeCont(buf);
140✔
89
  return code;
140✔
90
}
91

92
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
547✔
93
  int32_t code = 0;
547✔
94
  int32_t lino = 0;
547✔
95
  void*   buf = NULL;
547✔
96
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
547✔
97
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
548!
98
  buf = rpcMallocCont(len);
548✔
99
  STREAM_CHECK_NULL_GOTO(buf, terrno);
548!
100
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
548✔
101
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
548!
102
  *data = buf;
548✔
103
  *size = len;
548✔
104
  buf = NULL;
548✔
105
end:
548✔
106
  rpcFreeCont(buf);
548✔
107
  return code;
547✔
108
}
109

110
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
814✔
111
  int32_t code = 0;
814✔
112
  int32_t lino = 0;
814✔
113
  void*   buf = NULL;
814✔
114
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
814✔
115
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
814!
116
  buf = rpcMallocCont(len);
814✔
117
  STREAM_CHECK_NULL_GOTO(buf, terrno);
815!
118
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
815✔
119
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
815!
120
  *data = buf;
815✔
121
  *size = len;
815✔
122
  buf = NULL;
815✔
123
end:
815✔
124
  rpcFreeCont(buf);
815✔
125
  return code;
813✔
126
}
127

128

129
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
58,575✔
130
  int32_t code = 0;
58,575✔
131
  int32_t lino = 0;
58,575✔
132
  void*   buf = NULL;
58,575✔
133
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
58,575!
134
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
20,824✔
135
  buf = rpcMallocCont(dataEncodeSize);
20,813✔
136
  STREAM_CHECK_NULL_GOTO(buf, terrno);
20,820!
137
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
20,820✔
138
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
20,820!
139
  *data = buf;
20,820✔
140
  *size = dataEncodeSize;
20,820✔
141
  buf = NULL;
20,820✔
142
end:
58,571✔
143
  rpcFreeCont(buf);
58,571✔
144
  return code;
58,564✔
145
}
146

147
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
689✔
148
  int32_t        pNum = 1;
689✔
149
  STableKeyInfo* pList = NULL;
689✔
150
  int32_t        code = 0;
689✔
151
  int32_t        lino = 0;
689✔
152
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
689!
153
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
690!
154

155
  cleanupQueryTableDataCond(&pTask->cond);
691✔
156
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
691!
157
                                                      pTask->options.twindows, pTask->options.suid, pTask->options.ver));
158
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
691!
159

160
end:
691✔
161
  STREAM_PRINT_LOG_END(code, lino);
691!
162
  return code;
691✔
163
}
164

165
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t gid, bool isVTable, int64_t uid,
14,898✔
166
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
167
  int32_t code = 0;
14,898✔
168
  int32_t lino = 0;
14,898✔
169
  int32_t index = 0;
14,898✔
170
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
14,898!
171
  if (!isVTable) {
14,874✔
172
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &gid));
3,361!
173
  }
174
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
14,829!
175
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
14,796!
176
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
14,785!
177
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
14,785!
178
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
14,776!
179

180
end:
14,775✔
181
  STREAM_PRINT_LOG_END(code, lino)
14,775!
182
  return code;
14,777✔
183
}
184

185
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
14,878✔
186
  pTSchema->numOfCols = 1;
14,878✔
187
  pTSchema->version = ver;
14,878✔
188
  pTSchema->columns[0].colId = colId;
14,878✔
189
  pTSchema->columns[0].type = type;
14,878✔
190
  pTSchema->columns[0].bytes = bytes;
14,878✔
191
}
14,878✔
192

193
int32_t retrieveWalMetaData(SSubmitTbData* pSubmitTbData, void* pTableList, bool isVTable, SSDataBlock* pBlock,
70,206✔
194
                            int64_t ver) {
195
  int32_t code = 0;
70,206✔
196
  int32_t lino = 0;
70,206✔
197

198
  int64_t   uid = pSubmitTbData->uid;
70,206✔
199
  int32_t   numOfRows = 0;
70,206✔
200
  int64_t   skey = 0;
70,206✔
201
  int64_t   ekey = 0;
70,206✔
202
  STSchema* pTSchema = NULL;
70,206✔
203
  uint64_t  gid = 0;
70,206✔
204
  if (isVTable) {
70,206✔
205
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS);
34,102✔
206
  } else {
207
    STREAM_CHECK_CONDITION_GOTO(!qStreamUidInTableList(pTableList, uid), TDB_CODE_SUCCESS);
36,104✔
208
    gid = qStreamGetGroupId(pTableList, uid);
3,306✔
209
  }
210

211
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
14,835!
212
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
213
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
214
    numOfRows = pCol->nVal;
×
215

216
    SColVal colVal = {0};
×
217
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, 0, &colVal));
×
218
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
219

220
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, numOfRows - 1, &colVal));
×
221
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
222
  } else {
223
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
14,835✔
224
    SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, 0);
14,827✔
225
    STREAM_CHECK_NULL_GOTO(pRow, terrno);
14,808!
226
    SColVal colVal = {0};
14,808✔
227
    pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn));
14,808!
228
    STREAM_CHECK_NULL_GOTO(pTSchema, terrno);
14,906!
229
    buildTSchema(pTSchema, pSubmitTbData->sver, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES);
14,906✔
230
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
14,863!
231
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
14,835✔
232
    pRow = taosArrayGetP(pSubmitTbData->aRowP, numOfRows - 1);
14,835✔
233
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
14,822!
234
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
14,817✔
235
  }
236

237
  STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_SUBMIT_DATA, gid, isVTable, uid, skey, ekey, ver, numOfRows));
14,817!
238
  pBlock->info.rows++;
14,705✔
239
  stDebug("stream reader scan submit data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64 ", gid %" PRIu64
14,705✔
240
          ", rows:%d",
241
          uid, skey, ekey, gid, numOfRows);
242

243
end:
2,788✔
244
  taosMemoryFree(pTSchema);
70,288!
245
  STREAM_PRINT_LOG_END(code, lino)
70,425!
246
  return code;
70,429✔
247
}
248

249
int32_t retrieveWalData(SVnode* pVnode, SSubmitTbData* pSubmitTbData, SSDataBlock* pBlock, STimeWindow* window) {
16,103✔
250
  stDebug("stream reader retrieve data block %p", pSubmitTbData);
16,103✔
251
  int32_t code = 0;
16,103✔
252
  int32_t lino = 0;
16,103✔
253

254
  int32_t ver = pSubmitTbData->sver;
16,103✔
255
  int64_t uid = pSubmitTbData->uid;
16,103✔
256
  int32_t numOfRows = 0;
16,103✔
257

258
  STSchema* schemas = metaGetTbTSchema(pVnode->pMeta, uid, ver, 1);
16,103✔
259
  STREAM_CHECK_NULL_GOTO(schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
16,110!
260
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
16,110!
261
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
262
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
263
    int32_t rowStart = 0;
×
264
    int32_t rowEnd = pCol->nVal;
×
265
    if (window != NULL) {
×
266
      SColVal colVal = {0};
×
267
      rowStart = -1;
×
268
      rowEnd = -1;
×
269
      for (int32_t k = 0; k < pCol->nVal; k++) {
×
270
        STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
271
        int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
272
        if (ts >= window->skey && rowStart == -1) {
×
273
          rowStart = k;
×
274
        }
275
        if (ts > window->ekey && rowEnd == -1) {
×
276
          rowEnd = k;
×
277
        }
278
      }
279
      STREAM_CHECK_CONDITION_GOTO(rowStart == -1 || rowStart == rowEnd, TDB_CODE_SUCCESS);
×
280

281
      if (rowStart != -1 && rowEnd == -1) {
×
282
        rowEnd = pCol->nVal;
×
283
      }
284
    }
285
    numOfRows = rowEnd - rowStart;
×
286
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
×
287
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
288
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
289
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
290
      if (i >= taosArrayGetSize(pSubmitTbData->aCol)) {
×
291
        break;
×
292
      }
293
      for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aCol); j++) {
×
294
        SColData* pCol = taosArrayGet(pSubmitTbData->aCol, j);
×
295
        STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
296
        if (pCol->cid == pColData->info.colId) {
×
297
          for (int32_t k = rowStart; k < rowEnd; k++) {
×
298
            SColVal colVal = {0};
×
299
            STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
300
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
301
                                                !COL_VAL_IS_VALUE(&colVal)));
302
          }
303
        }
304
      }
305
    }
306
  } else {
307
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, taosArrayGetSize(pSubmitTbData->aRowP)));
16,110!
308
    for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aRowP); j++) {
536,762✔
309
      SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, j);
520,653✔
310
      STREAM_CHECK_NULL_GOTO(pRow, terrno);
520,664!
311
      SColVal colVal = {0};
520,664✔
312
      STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, 0, &colVal));
520,664!
313
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
520,671✔
314
      if (window != NULL && (ts < window->skey || ts > window->ekey)) {
520,671✔
315
        continue;
44✔
316
      }
317
      for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
1,587,917✔
318
        if (i >= schemas->numOfCols) {
1,577,497✔
319
          break;
510,180✔
320
        }
321
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
1,067,317✔
322
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
1,067,311✔
323
        SColVal colVal = {0};
1,067,308✔
324
        int32_t sourceIdx = 0;
1,067,308✔
325
        while (1) {
326
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, sourceIdx, &colVal));
1,672,813!
327
          if (colVal.cid < pColData->info.colId) {
1,672,788✔
328
            sourceIdx++;
605,505✔
329
            continue;
605,505✔
330
          } else {
331
            break;
1,067,283✔
332
          }
333
        }
334
        if (colVal.cid == pColData->info.colId && COL_VAL_IS_VALUE(&colVal)) {
1,067,283!
335
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
1,052,247!
336
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, numOfRows, colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
18!
337
          } else {
338
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
1,052,229!
339
          }
340
        } else {
341
          colDataSetNULL(pColData, numOfRows);
15,036!
342
        }
343
      }
344
      numOfRows++;
520,610✔
345
    }
346
  }
347

348
  pBlock->info.rows = numOfRows;
16,115✔
349

350
end:
16,115✔
351
  taosMemoryFree(schemas);
16,115!
352
  STREAM_PRINT_LOG_END(code, lino)
16,119!
353
  return code;
16,117✔
354
}
355

356
static int32_t buildDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, SDeleteRes* req, int64_t uid, int64_t ver){
145✔
357
  int32_t    code = 0;
145✔
358
  int32_t    lino = 0;
145✔
359
  uint64_t   gid = 0;
145✔
360
  stDebug("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
145✔
361
  if (isVTable) {
145✔
362
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS)
45✔
363
  } else {
364
    gid = qStreamGetGroupId(pTableList, uid);
100✔
365
    STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
100✔
366
  }
367
  STREAM_CHECK_RET_GOTO(
74!
368
      buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, uid, req->skey, req->ekey, ver, 1));
369
  pBlock->info.rows++;
74✔
370

371
  stDebug("stream reader scan delete end data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
74✔
372
end:
48✔
373
  return code;
145✔
374
}
375

376
static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
145✔
377
                              int64_t ver) {
378
  int32_t    code = 0;
145✔
379
  int32_t    lino = 0;
145✔
380
  SDecoder   decoder = {0};
145✔
381
  SDeleteRes req = {0};
145✔
382
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
145✔
383
  tDecoderInit(&decoder, data, len);
145✔
384
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
145!
385
  
386
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
290✔
387
    uint64_t* uid = taosArrayGet(req.uidList, i);
145✔
388
    STREAM_CHECK_NULL_GOTO(uid, terrno);
145!
389
    STREAM_CHECK_RET_GOTO(buildDeleteData(pTableList, isVTable, pBlock, &req, *uid, ver));
145!
390
  }
391

392
end:
145✔
393
  taosArrayDestroy(req.uidList);
145✔
394
  tDecoderClear(&decoder);
145✔
395
  return code;
145✔
396
}
397

398
static int32_t scanDropTable(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
×
399
                             int64_t ver) {
400
  int32_t  code = 0;
×
401
  int32_t  lino = 0;
×
402
  SDecoder decoder = {0};
×
403

404
  SVDropTbBatchReq req = {0};
×
405
  tDecoderInit(&decoder, data, len);
×
406
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
×
407
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
408
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
409
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
×
410
    uint64_t gid = 0;
×
411
    if (isVTable) {
×
412
      if (taosHashGet(pTableList, &pDropTbReq->uid, sizeof(pDropTbReq->uid)) == NULL) {
×
413
        continue;
×
414
      }
415
    } else {
416
      gid = qStreamGetGroupId(pTableList, pDropTbReq->uid);
×
417
      if (gid == -1) continue;
×
418
    }
419

420
    STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_DELETE_TABLE, gid, isVTable, pDropTbReq->uid, 0, 0, ver, 1));
×
421
    pBlock->info.rows++;
×
422
    stDebug("stream reader scan drop :uid %" PRIu64 ", gid %" PRIu64, pDropTbReq->uid, gid);
×
423
  }
424

425
end:
×
426
  tDecoderClear(&decoder);
×
427
  return code;
×
428
}
429

430
static int32_t scanSubmitData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
70,384✔
431
                              int64_t ver) {
432
  int32_t  code = 0;
70,384✔
433
  int32_t  lino = 0;
70,384✔
434
  SDecoder decoder = {0};
70,384✔
435

436
  SSubmitReq2 submit = {0};
70,384✔
437
  tDecoderInit(&decoder, data, len);
70,384✔
438
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
70,342!
439

440
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
70,259✔
441

442
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfBlocks));
70,253!
443

444
  int32_t nextBlk = -1;
70,238✔
445
  while (++nextBlk < numOfBlocks) {
140,656✔
446
    stDebug("stream reader scan submit, next data block %d/%d", nextBlk, numOfBlocks);
70,237✔
447
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
70,238✔
448
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
70,217!
449
    STREAM_CHECK_RET_GOTO(retrieveWalMetaData(pSubmitTbData, pTableList, isVTable, pBlock, ver));
70,217!
450
  }
451
end:
70,419✔
452
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
70,419✔
453
  tDecoderClear(&decoder);
70,389✔
454
  return code;
70,475✔
455
}
456

457
static int32_t scanWal(SVnode* pVnode, void* pTableList, bool isVTable, SSDataBlock* pBlock, int64_t lastVer,
10,458✔
458
                       int8_t deleteData, int8_t deleteTb, int64_t ctime, int64_t* retVer) {
459
  int32_t code = 0;
10,458✔
460
  int32_t lino = 0;
10,458✔
461

462
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
10,458✔
463
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
10,458!
464
  *retVer = walGetAppliedVer(pWalReader->pWal);
10,458✔
465
  STREAM_CHECK_CONDITION_GOTO(walReaderSeekVer(pWalReader, lastVer + 1) != 0, TSDB_CODE_SUCCESS);
10,463✔
466

467
  while (1) {
71,156✔
468
    *retVer = walGetAppliedVer(pWalReader->pWal);
72,919✔
469
    STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader, true) < 0, TSDB_CODE_SUCCESS);
72,893✔
470

471
    SWalCont* wCont = &pWalReader->pHead->head;
71,093✔
472
    if (wCont->ingestTs / 1000 > ctime) break;
71,093!
473
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
71,093✔
474
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
71,093✔
475
    int64_t ver = wCont->version;
71,093✔
476

477
    stDebug("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
71,093✔
478
      TD_VID(pVnode), ver, wCont->msgType, deleteData, deleteTb);
479
    if (wCont->msgType == TDMT_VND_DELETE && deleteData != 0) {
71,091✔
480
      STREAM_CHECK_RET_GOTO(scanDeleteData(pTableList, isVTable, pBlock, data, len, ver));
145!
481
    } else if (wCont->msgType == TDMT_VND_DROP_TABLE && deleteTb != 0) {
70,946!
482
      STREAM_CHECK_RET_GOTO(scanDropTable(pTableList, isVTable, pBlock, data, len, ver));
×
483
    } else if (wCont->msgType == TDMT_VND_SUBMIT) {
70,946✔
484
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
70,400✔
485
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
70,400✔
486
      STREAM_CHECK_RET_GOTO(scanSubmitData(pTableList, isVTable, pBlock, data, len, ver));
70,400!
487
    }
488

489
    if (pBlock->info.rows >= STREAM_RETURN_ROWS_NUM) {
71,156!
490
      break;
×
491
    }
492
  }
493

494
end:
10,467✔
495
  walCloseReader(pWalReader);
10,467✔
496
  STREAM_PRINT_LOG_END(code, lino);
10,471!
497
  return code;
10,477✔
498
}
499

500
int32_t scanWalOneVer(SVnode* pVnode, SSDataBlock* pBlock, SSDataBlock* pBlockRet,
16,120✔
501
                      int64_t ver, int64_t uid, STimeWindow* window) {
502
  int32_t     code = 0;
16,120✔
503
  int32_t     lino = 0;
16,120✔
504
  SSubmitReq2 submit = {0};
16,120✔
505
  SDecoder    decoder = {0};
16,120✔
506

507
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
16,120✔
508
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
16,119!
509

510
  STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, ver));
16,119!
511
  STREAM_CHECK_CONDITION_GOTO(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT, TSDB_CODE_STREAM_WAL_VER_NOT_DATA);
16,115!
512
  STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
16,115!
513
  void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
16,117✔
514
  int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
16,117✔
515

516
  int32_t nextBlk = -1;
16,117✔
517
  tDecoderInit(&decoder, pBody, bodyLen);
16,117✔
518
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
16,113!
519

520
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
16,114✔
521
  while (++nextBlk < numOfBlocks) {
32,217✔
522
    stDebug("stream reader next data block %d/%d", nextBlk, numOfBlocks);
16,103✔
523
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
16,103✔
524
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
16,105✔
525
    if (pSubmitTbData->uid != uid) {
16,103!
526
      stDebug("stream reader skip data block uid:%" PRId64, pSubmitTbData->uid);
×
527
      continue;
×
528
    }
529
    STREAM_CHECK_RET_GOTO(retrieveWalData(pVnode, pSubmitTbData, pBlock, window));
16,103!
530
    printDataBlock(pBlock, __func__, "");
16,113✔
531

532
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRet, pBlock));
16,112!
533
    blockDataCleanup(pBlock);
16,110✔
534
  }
535

536
end:
16,114✔
537
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
16,114✔
538
  walCloseReader(pWalReader);
16,120✔
539
  tDecoderClear(&decoder);
16,121✔
540
  STREAM_PRINT_LOG_END(code, lino);
16,121!
541
  return code;
16,120✔
542
}
543

544
static int32_t processTag(SVnode* pVnode, SExprInfo* pExpr, int32_t numOfExpr, SStorageAPI* api, SSDataBlock* pBlock) {
5,883✔
545
  int32_t     code = 0;
5,883✔
546
  int32_t     lino = 0;
5,883✔
547
  SMetaReader mr = {0};
5,883✔
548

549
  if (numOfExpr == 0) {
5,883!
550
    return TSDB_CODE_SUCCESS;
×
551
  }
552
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
5,883✔
553
  code = api->metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
5,886✔
554
  if (code != TSDB_CODE_SUCCESS) {
5,871!
555
    stError("failed to get table meta, uid:%" PRId64 ", code:%s", pBlock->info.id.uid, tstrerror(code));
×
556
  }
557
  api->metaReaderFn.readerReleaseLock(&mr);
5,871✔
558
  for (int32_t j = 0; j < numOfExpr; ++j) {
26,761✔
559
    const SExprInfo* pExpr1 = &pExpr[j];
20,891✔
560
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
20,891✔
561

562
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
20,891✔
563
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
20,879✔
564
    if (mr.me.name == NULL) {
20,877!
565
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
566
      continue;
×
567
    }
568
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
20,877✔
569

570
    int32_t functionId = pExpr1->pExpr->_function.functionId;
20,877✔
571

572
    // this is to handle the tbname
573
    if (fmIsScanPseudoColumnFunc(functionId)) {
20,877✔
574
      int32_t fType = pExpr1->pExpr->_function.functionType;
5,870✔
575
      if (fType == FUNCTION_TYPE_TBNAME) {
5,870!
576
        STREAM_CHECK_RET_GOTO(setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name));
5,873!
577
        pColInfoData->info.colId = -1;
5,870✔
578
      }
579
    } else {  // these are tags
580
      STagVal tagVal = {0};
14,998✔
581
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
14,998✔
582
      const char* p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
14,998✔
583

584
      char* data = NULL;
14,999✔
585
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
14,999!
586
        data = tTagValToData((const STagVal*)p, false);
14,993✔
587
      } else {
588
        data = (char*)p;
6✔
589
      }
590

591
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
14,992!
592
      if (isNullVal) {
14,992!
593
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
594
      } else {
595
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
14,992✔
596
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
15,007!
597
          taosMemoryFree(data);
8,826!
598
        }
599
        STREAM_CHECK_RET_GOTO(code);
15,009!
600
      }
601
    }
602
  }
603

604
end:
5,870✔
605
  api->metaReaderFn.clearReader(&mr);
5,870✔
606

607
  STREAM_PRINT_LOG_END(code, lino);
5,876!
608
  return code;
5,877✔
609
}
610

611
static int32_t processWalVerData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamInfo, int64_t ver, bool isTrigger,
5,669✔
612
                                 int64_t uid, STimeWindow* window, SSDataBlock* pSrcBlock, SSDataBlock** pBlock) {
613
  int32_t      code = 0;
5,669✔
614
  int32_t      lino = 0;
5,669✔
615
  SFilterInfo* pFilterInfo = NULL;
5,669✔
616
  SSDataBlock* pBlock1 = NULL;
5,669✔
617
  SSDataBlock* pBlock2 = NULL;
5,669✔
618

619
  SExprInfo*   pExpr = sStreamInfo->pExprInfo;
5,669✔
620
  int32_t      numOfExpr = sStreamInfo->numOfExpr;
5,669✔
621

622
  STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamInfo->pConditions, &pFilterInfo, 0, NULL));
5,669!
623

624
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock1));
5,664!
625
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock2));
5,670!
626
  if (!isTrigger) STREAM_CHECK_RET_GOTO(createOneDataBlock(pSrcBlock, false, pBlock));
5,670!
627

628
  pBlock2->info.id.uid = uid;
5,672✔
629
  pBlock1->info.id.uid = uid;
5,672✔
630

631
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
5,672!
632

633
  if (pBlock2->info.rows > 0) {
5,669!
634
    SStorageAPI  api = {0};
5,669✔
635
    initStorageAPI(&api);
5,669✔
636
    STREAM_CHECK_RET_GOTO(processTag(pVnode, pExpr, numOfExpr, &api, pBlock2));
5,667!
637
  }
638
  STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock2, pFilterInfo));
5,661!
639
  if (!isTrigger) {
5,668✔
640
    blockDataTransform(*pBlock, pBlock2);
3,720✔
641
  } else {
642
    *pBlock = pBlock2;
1,948✔
643
    pBlock2 = NULL;  
1,948✔
644
  }
645

646
  printDataBlock(*pBlock, __func__, "processWalVerData2");
5,668✔
647

648
end:
5,670✔
649
  STREAM_PRINT_LOG_END(code, lino);
5,670!
650
  filterFreeInfo(pFilterInfo);
5,670✔
651
  blockDataDestroy(pBlock1);
5,671✔
652
  blockDataDestroy(pBlock2);
5,670✔
653
  return code;
5,673✔
654
}
655

656
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
10,801✔
657
  int32_t code = 0;
10,801✔
658
  int32_t lino = 0;
10,801✔
659
  SMetaReader metaReader = {0};
10,801✔
660
  SStorageAPI api = {0};
10,801✔
661
  initStorageAPI(&api);
10,801✔
662
  *schemas = taosArrayInit(8, sizeof(SSchema));
10,803✔
663
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
10,803!
664
  
665
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
10,803✔
666
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
10,803!
667

668
  SSchemaWrapper* sSchemaWrapper = NULL;
10,798✔
669
  if (metaReader.me.type == TD_CHILD_TABLE) {
10,798✔
670
    int64_t suid = metaReader.me.ctbEntry.suid;
10,664✔
671
    tDecoderClear(&metaReader.coder);
10,664✔
672
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
10,670!
673
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
10,663✔
674
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
134!
675
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
134✔
676
  } else {
677
    qError("invalid table type:%d", metaReader.me.type);
×
678
  }
679

680
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
62,903✔
681
    SSchema* s = sSchemaWrapper->pSchema + j;
52,100✔
682
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
104,207!
683
  }
684

685
end:
10,803✔
686
  api.metaReaderFn.clearReader(&metaReader);
10,803✔
687
  STREAM_PRINT_LOG_END(code, lino);
10,803!
688
  if (code != 0)  taosArrayDestroy(*schemas);
10,803!
689
  return code;
10,800✔
690
}
691

692
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
10,800✔
693
  int32_t code = 0;
10,800✔
694
  int32_t lino = 0;
10,800✔
695
  size_t  schemaLen = taosArrayGetSize(schemas);
10,800✔
696
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
10,801!
697
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
54,396✔
698
    col_id_t* id = taosArrayGet(cols, i);
43,586✔
699
    STREAM_CHECK_NULL_GOTO(id, terrno);
43,588!
700
    for (size_t i = 0; i < schemaLen; i++) {
129,030✔
701
      SSchema* s = taosArrayGet(schemas, i);
129,025✔
702
      STREAM_CHECK_NULL_GOTO(s, terrno);
129,027!
703
      if (*id == s->colId) {
129,029✔
704
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
43,590!
705
        break;
43,590✔
706
      }
707
    }
708
  }
709
  taosArrayPopFrontBatch(schemas, schemaLen);
10,801✔
710

711
end:
10,803✔
712
  return code;
10,803✔
713
}
714

715
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
10,448✔
716
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
717
  int32_t      code = 0;
10,448✔
718
  int32_t      lino = 0;
10,448✔
719
  SArray*      schemas = NULL;
10,448✔
720

721
  SSDataBlock* pBlock1 = NULL;
10,448✔
722
  SSDataBlock* pBlock2 = NULL;
10,448✔
723

724
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
10,448!
725
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
10,447!
726
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock1));
10,449!
727
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
10,448!
728

729
  pBlock2->info.id.uid = uid;
10,449✔
730
  pBlock1->info.id.uid = uid;
10,449✔
731

732
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
10,449!
733
  printDataBlock(pBlock2, __func__, "");
10,450✔
734

735
  *pBlock = pBlock2;
10,450✔
736
  pBlock2 = NULL;
10,450✔
737

738
end:
10,450✔
739
  STREAM_PRINT_LOG_END(code, lino);
10,450!
740
  blockDataDestroy(pBlock1);
10,450✔
741
  blockDataDestroy(pBlock2);
10,449✔
742
  taosArrayDestroy(schemas);
10,449✔
743
  return code;
10,451✔
744
}
745

746
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
263,260✔
747
                                    STargetNode* pTargetNodeTs) {
748
  int32_t code = 0;
263,260✔
749
  int32_t lino = 0;
263,260✔
750

751
  SColumnNode*         pCol = NULL;
263,260✔
752
  SColumnNode*         pCol1 = NULL;
263,260✔
753
  SValueNode*          pVal = NULL;
263,260✔
754
  SValueNode*          pVal1 = NULL;
263,260✔
755
  SOperatorNode*       op = NULL;
263,260✔
756
  SOperatorNode*       op1 = NULL;
263,260✔
757
  SLogicConditionNode* cond = NULL;
263,260✔
758

759
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
263,260!
760
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
263,260✔
761
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
263,260✔
762
  pCol->node.resType.bytes = LONG_BYTES;
263,260✔
763
  pCol->slotId = pTargetNodeTs->slotId;
263,260✔
764
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
263,260✔
765

766
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
263,260!
767

768
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
263,260!
769
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
263,260✔
770
  pVal->node.resType.bytes = LONG_BYTES;
263,260✔
771
  pVal->datum.i = start;
263,260✔
772
  pVal->typeData = start;
263,260✔
773

774
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
263,260!
775
  pVal1->datum.i = end;
263,260✔
776
  pVal1->typeData = end;
263,260✔
777

778
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
263,260!
779
  op->opType = OP_TYPE_GREATER_EQUAL;
263,260✔
780
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,260✔
781
  op->node.resType.bytes = CHAR_BYTES;
263,260✔
782
  op->pLeft = (SNode*)pCol;
263,260✔
783
  op->pRight = (SNode*)pVal;
263,260✔
784
  pCol = NULL;
263,260✔
785
  pVal = NULL;
263,260✔
786

787
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
263,260!
788
  op1->opType = OP_TYPE_LOWER_EQUAL;
263,260✔
789
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,260✔
790
  op1->node.resType.bytes = CHAR_BYTES;
263,260✔
791
  op1->pLeft = (SNode*)pCol1;
263,260✔
792
  op1->pRight = (SNode*)pVal1;
263,260✔
793
  pCol1 = NULL;
263,260✔
794
  pVal1 = NULL;
263,260✔
795

796
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
263,260!
797
  cond->condType = LOGIC_COND_TYPE_AND;
263,260✔
798
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,260✔
799
  cond->node.resType.bytes = CHAR_BYTES;
263,260✔
800
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
263,260!
801
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
263,260!
802
  op = NULL;
263,260✔
803
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
263,260!
804
  op1 = NULL;
263,260✔
805

806
  *pCond = cond;
263,260✔
807

808
end:
263,260✔
809
  if (code != 0) {
263,260!
810
    nodesDestroyNode((SNode*)pCol);
×
811
    nodesDestroyNode((SNode*)pCol1);
×
812
    nodesDestroyNode((SNode*)pVal);
×
813
    nodesDestroyNode((SNode*)pVal1);
×
814
    nodesDestroyNode((SNode*)op);
×
815
    nodesDestroyNode((SNode*)op1);
×
816
    nodesDestroyNode((SNode*)cond);
×
817
  }
818
  STREAM_PRINT_LOG_END(code, lino);
263,260!
819

820
  return code;
263,260✔
821
}
822

823
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
217✔
824
  int32_t              code = 0;
217✔
825
  int32_t              lino = 0;
217✔
826
  SLogicConditionNode* pAndCondition = NULL;
217✔
827
  SLogicConditionNode* cond = NULL;
217✔
828

829
  if (pTargetNodeTs == NULL) {
217!
830
    vError("stream reader %s no ts column", __func__);
×
831
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
×
832
  }
833
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
217!
834
  cond->condType = LOGIC_COND_TYPE_OR;
217✔
835
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
217✔
836
  cond->node.resType.bytes = CHAR_BYTES;
217✔
837
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
217!
838

839
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
263,477✔
840
    data->curIdx = i;
263,260✔
841

842
    SReadHandle handle = {0};
263,260✔
843
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
263,260✔
844
    if (!handle.winRangeValid) {
263,260!
845
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
×
846
              handle.winRange.ekey);
847
      continue;
×
848
    }
849
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
263,260!
850
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
263,260✔
851
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
263,260!
852
    pAndCondition = NULL;
263,260✔
853
  }
854

855
  *pCond = cond;
217✔
856

857
end:
217✔
858
  if (code != 0) {
217!
859
    nodesDestroyNode((SNode*)pAndCondition);
×
860
    nodesDestroyNode((SNode*)cond);
×
861
  }
862
  STREAM_PRINT_LOG_END(code, lino);
217!
863

864
  return code;
217✔
865
}
866

867
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
15,118✔
868
                                    STimeRangeNode* node, SReadHandle* handle) {
869
  int32_t code = 0;
15,118✔
870
  int32_t lino = 0;
15,118✔
871
  SArray* funcVals = NULL;
15,118✔
872
  if (req->pStRtFuncInfo->withExternalWindow) {
15,118✔
873
    nodesDestroyNode(sStreamReaderCalcInfo->tsConditions);
217✔
874
    filterFreeInfo(sStreamReaderCalcInfo->pFilterInfo);
217✔
875
    sStreamReaderCalcInfo->pFilterInfo = NULL;
217✔
876

877
    STREAM_CHECK_RET_GOTO(createExternalConditions(req->pStRtFuncInfo,
217!
878
                                                   (SLogicConditionNode**)&sStreamReaderCalcInfo->tsConditions,
879
                                                   sStreamReaderCalcInfo->pTargetNodeTs, node));
880

881
    STREAM_CHECK_RET_GOTO(filterInitFromNode((SNode*)sStreamReaderCalcInfo->tsConditions,
217!
882
                                             (SFilterInfo**)&sStreamReaderCalcInfo->pFilterInfo,
883
                                             FLT_OPTION_NO_REWRITE | FLT_OPTION_SCALAR_MODE, NULL));
884
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
217✔
885
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
217✔
886
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
217!
887
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
217!
888

889
    handle->winRange.skey = pFirst->wstart;
217✔
890
    handle->winRange.ekey = pLast->wend;
217✔
891
    handle->winRangeValid = true;
217✔
892
    stDebug("%s withExternalWindow is true, skey:%" PRId64 ", ekey:%" PRId64, __func__, pFirst->wstart, pLast->wend);
217✔
893
  } else {
894
    calcTimeRange(node, req->pStRtFuncInfo, &handle->winRange, &handle->winRangeValid);
14,901✔
895
  }
896

897
end:
15,118✔
898
  taosArrayDestroy(funcVals);
15,118✔
899
  return code;
15,118✔
900
}
901

902
static int32_t createBlockForWalMeta(SSDataBlock** pBlock, bool isVTable) {
10,450✔
903
  int32_t code = 0;
10,450✔
904
  int32_t lino = 0;
10,450✔
905
  SArray* schemas = NULL;
10,450✔
906

907
  schemas = taosArrayInit(8, sizeof(SSchema));
10,450✔
908
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
10,462!
909

910
  int32_t index = 0;
10,462✔
911
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
10,462!
912
  if (!isVTable) {
10,463✔
913
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid
7,269!
914
  }
915
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
10,464!
916
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
10,462!
917
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
10,463!
918
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
10,462!
919
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // nrows
10,462!
920

921
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
10,462!
922

923
end:
10,463✔
924
  taosArrayDestroy(schemas);
10,463✔
925
  return code;
10,466✔
926
}
927

928
static int32_t createOptionsForLastTs(SStreamTriggerReaderTaskInnerOptions* options,
422✔
929
                                      SStreamTriggerReaderInfo*             sStreamReaderInfo) {
930
  int32_t code = 0;
422✔
931
  int32_t lino = 0;
422✔
932
  SArray* schemas = NULL;
422✔
933

934
  schemas = taosArrayInit(4, sizeof(SSchema));
422✔
935
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
425!
936
  STREAM_CHECK_RET_GOTO(
425!
937
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // last ts
938

939
  BUILD_OPTION(op, sStreamReaderInfo, -1, true, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, schemas, true,
425✔
940
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
941
  schemas = NULL;
425✔
942
  *options = op;
425✔
943

944
end:
425✔
945
  taosArrayDestroy(schemas);
425✔
946
  return code;
425✔
947
}
948

949
static int32_t createOptionsForFirstTs(SStreamTriggerReaderTaskInnerOptions* options,
390✔
950
                                       SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t ver) {
951
  int32_t code = 0;
390✔
952
  int32_t lino = 0;
390✔
953
  SArray* schemas = NULL;
390✔
954

955
  schemas = taosArrayInit(4, sizeof(SSchema));
390✔
956
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
390!
957
  STREAM_CHECK_RET_GOTO(
390!
958
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // first ts
959

960
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, TSDB_ORDER_ASC, start, INT64_MAX, schemas, true,
390✔
961
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
962
  schemas = NULL;
390✔
963

964
  *options = op;
390✔
965
end:
390✔
966
  taosArrayDestroy(schemas);
390✔
967
  return code;
390✔
968
}
969

970
static int32_t createOptionsForTsdbMeta(SStreamTriggerReaderTaskInnerOptions* options,
1,244✔
971
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t end,
972
                                        int64_t gid, int8_t order, int64_t ver, bool onlyTs) {
973
  int32_t code = 0;
1,244✔
974
  int32_t lino = 0;
1,244✔
975
  SArray* schemas = NULL;
1,244✔
976

977
  int32_t index = 1;
1,244✔
978
  schemas = taosArrayInit(8, sizeof(SSchema));
1,244✔
979
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,244!
980
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
1,244!
981
  if (!onlyTs){
1,244✔
982
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
618!
983
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
618!
984
    if (sStreamReaderInfo->uidList == NULL) {
618✔
985
      STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
129!
986
    }
987
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
618!
988
  }
989
  
990
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, start, end, schemas, true, (gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), gid,
1,244✔
991
               true, sStreamReaderInfo->uidList);
992
  schemas = NULL;
1,244✔
993
  *options = op;
1,244✔
994

995
end:
1,244✔
996
  taosArrayDestroy(schemas);
1,244✔
997
  return code;
1,244✔
998
}
999

1000
static int taosCompareInt64Asc(const void* elem1, const void* elem2) {
283✔
1001
  int64_t* node1 = (int64_t*)elem1;
283✔
1002
  int64_t* node2 = (int64_t*)elem2;
283✔
1003

1004
  if (*node1 < *node2) {
283!
1005
    return -1;
×
1006
  }
1007

1008
  return *node1 > *node2;
283✔
1009
}
1010

1011
static int32_t getTableList(SArray** pList, int32_t* pNum, int64_t* suid, int32_t index,
188✔
1012
                            SStreamTriggerReaderInfo* sStreamReaderInfo) {
1013
  int32_t code = 0;
188✔
1014
  int32_t lino = 0;
188✔
1015

1016
  int32_t* start = taosArrayGet(sStreamReaderInfo->uidListIndex, index);
188✔
1017
  STREAM_CHECK_NULL_GOTO(start, terrno);
188!
1018
  int32_t  iStart = *start;
188✔
1019
  int32_t* end = taosArrayGet(sStreamReaderInfo->uidListIndex, index + 1);
188✔
1020
  STREAM_CHECK_NULL_GOTO(end, terrno);
188!
1021
  int32_t iEnd = *end;
188✔
1022
  *pList = taosArrayInit(iEnd - iStart, sizeof(STableKeyInfo));
188✔
1023
  STREAM_CHECK_NULL_GOTO(*pList, terrno);
188!
1024
  for (int32_t i = iStart; i < iEnd; ++i) {
702✔
1025
    int64_t*       uid = taosArrayGet(sStreamReaderInfo->uidList, i);
514✔
1026
    STableKeyInfo* info = taosArrayReserve(*pList, 1);
514✔
1027
    STREAM_CHECK_NULL_GOTO(info, terrno);
514!
1028
    *suid = uid[0];
514✔
1029
    info->uid = uid[1];
514✔
1030
  }
1031
  *pNum = iEnd - iStart;
188✔
1032
end:
188✔
1033
  STREAM_PRINT_LOG_END(code, lino);
188!
1034
  return code;
188✔
1035
}
1036

1037
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
626✔
1038
                                  SStreamReaderTaskInner* pTask) {
1039
  int32_t code = 0;
626✔
1040
  int32_t lino = 0;
626✔
1041

1042
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTask->pTableList), sizeof(STsInfo));
626✔
1043
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
627!
1044
  while (true) {
690✔
1045
    bool hasNext = false;
1,317✔
1046
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
1,317!
1047
    if (hasNext) {
1,317✔
1048
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
713✔
1049
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
713✔
1050
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
713!
1051
      if (pTask->options.order == TSDB_ORDER_ASC) {
713✔
1052
        tsInfo->ts = pTask->pResBlock->info.window.skey;
575✔
1053
      } else {
1054
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
138✔
1055
      }
1056
      tsInfo->gId = qStreamGetGroupId(pTask->pTableList, pTask->pResBlock->info.id.uid);
713✔
1057
      stDebug("vgId:%d %s get last ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
713✔
1058
              tsInfo->gId, tsRsp->ver);
1059
    }
1060

1061
    pTask->currentGroupIndex++;
1,317✔
1062
    if (pTask->currentGroupIndex >= qStreamGetTableListGroupNum(pTask->pTableList)) {
1,317✔
1063
      break;
627✔
1064
    }
1065
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTask));
689!
1066
  }
1067

1068
end:
627✔
1069
  return code;
627✔
1070
}
1071

1072
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
188✔
1073
                               SStreamReaderTaskInner* pTask, int64_t ver) {
1074
  int32_t code = 0;
188✔
1075
  int32_t lino = 0;
188✔
1076
  int64_t suid = 0;
188✔
1077
  SArray* pList = NULL;
188✔
1078
  int32_t pNum = 0;
188✔
1079

1080
  tsRsp->tsInfo = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(STsInfo));
188✔
1081
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
188!
1082
  for (int32_t i = 0; i < taosArrayGetSize(sStreamReaderInfo->uidListIndex) - 1; ++i) {
376✔
1083
    STREAM_CHECK_RET_GOTO(getTableList(&pList, &pNum, &suid, i, sStreamReaderInfo));
188!
1084

1085
    cleanupQueryTableDataCond(&pTask->cond);
188✔
1086
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas,
188!
1087
                                                        pTask->options.isSchema, pTask->options.twindows, suid, ver));
1088
    STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderOpen(
188!
1089
        pVnode, &pTask->cond, taosArrayGet(pList, 0), pNum, pTask->pResBlock, (void**)&pTask->pReader, pTask->idStr, NULL));
1090
    taosArrayDestroy(pList);
188✔
1091
    pList = NULL;
188✔
1092
    while (true) {
425✔
1093
      bool hasNext = false;
613✔
1094
      STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
613!
1095
      if (!hasNext) {
613✔
1096
        break;
188✔
1097
      }
1098
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
425✔
1099
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
425✔
1100
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
425!
1101
      if (pTask->options.order == TSDB_ORDER_ASC) {
425✔
1102
        tsInfo->ts = pTask->pResBlock->info.window.skey;
169✔
1103
      } else {
1104
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
256✔
1105
      }
1106
      tsInfo->gId = pTask->pResBlock->info.id.uid;
425✔
1107
      stDebug("vgId:%d %s get vtable last ts:%" PRId64 ", uid:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__,
425✔
1108
              tsInfo->ts, tsInfo->gId, tsRsp->ver);
1109
    }
1110
    pTask->api.tsdReader.tsdReaderClose(pTask->pReader);
188✔
1111
    pTask->pReader = NULL;
188✔
1112
  }
1113

1114
end:
188✔
1115
  taosArrayDestroy(pList);
188✔
1116
  return code;
188✔
1117
}
1118

1119
static void reSetUid(SStreamTriggerReaderTaskInnerOptions* options, int64_t suid, int64_t uid) {
394✔
1120
  if (suid != 0) options->suid = suid;
394✔
1121
  options->uid = uid;
394✔
1122
  if (options->suid != 0) {
394✔
1123
    options->tableType = TD_CHILD_TABLE;
390✔
1124
  } else {
1125
    options->tableType = TD_NORMAL_TABLE;
4✔
1126
  }
1127
}
394✔
1128

1129
static int32_t createOptionsForTsdbData(SVnode* pVnode, SStreamTriggerReaderTaskInnerOptions* options,
353✔
1130
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid, SArray* cols,
1131
                                        int8_t order,int64_t skey, int64_t ekey, int64_t ver) {
1132
  int32_t code = 0;
353✔
1133
  int32_t lino = 0;
353✔
1134
  SArray* schemas = NULL;
353✔
1135

1136
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
353!
1137
  STREAM_CHECK_RET_GOTO(shrinkScheams(cols, schemas));
353!
1138
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, skey, ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
353✔
1139
  *options = op;
353✔
1140

1141
end:
353✔
1142
  return code;
353✔
1143
}
1144

1145
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
140✔
1146
  int32_t code = 0;
140✔
1147
  int32_t lino = 0;
140✔
1148
  void*   buf = NULL;
140✔
1149
  size_t  size = 0;
140✔
1150
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
140!
1151
  void* pTask = sStreamReaderInfo->pTask;
140✔
1152

1153
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
140✔
1154

1155
  TSWAP(sStreamReaderInfo->uidList, req->setTableReq.uids);
140✔
1156
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidList, TSDB_CODE_INVALID_PARA);
140!
1157

1158
  taosArraySort(sStreamReaderInfo->uidList, taosCompareInt64Asc);
140✔
1159
  if (sStreamReaderInfo->uidListIndex != NULL) {
140!
1160
    taosArrayClear(sStreamReaderInfo->uidListIndex);
×
1161
  } else {
1162
    sStreamReaderInfo->uidListIndex = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(int32_t));
140✔
1163
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidListIndex, TSDB_CODE_INVALID_PARA);
140!
1164
  }
1165

1166
  if (sStreamReaderInfo->uidHash != NULL) {
140!
1167
    taosHashClear(sStreamReaderInfo->uidHash);
×
1168
  } else {
1169
    sStreamReaderInfo->uidHash = taosHashInit(taosArrayGetSize(sStreamReaderInfo->uidList),
140✔
1170
                                              taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1171
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHash, TSDB_CODE_INVALID_PARA);
140!
1172
  }
1173

1174
  int64_t suid = 0;
140✔
1175
  int32_t cnt = taosArrayGetSize(sStreamReaderInfo->uidList);
140✔
1176
  for (int32_t i = 0; i < cnt; ++i) {
467✔
1177
    int64_t* data = taosArrayGet(sStreamReaderInfo->uidList, i);
327✔
1178
    STREAM_CHECK_NULL_GOTO(data, terrno);
327!
1179
    if (*data == 0 || *data != suid) {
327✔
1180
      STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &i), terrno);
280!
1181
    }
1182
    suid = *data;
327✔
1183
    stDebug("vgId:%d %s suid:%" PRId64 ",uid:%" PRId64 ", index:%d", TD_VID(pVnode), __func__, suid, data[1], i);
327✔
1184
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->uidHash, data + 1, LONG_BYTES, &i, sizeof(int32_t)));
327!
1185
  }
1186
  STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &cnt), terrno);
280!
1187

1188
end:
140✔
1189
  STREAM_PRINT_LOG_END_WITHID(code, lino);
140!
1190
  SRpcMsg rsp = {
140✔
1191
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1192
  tmsgSendRsp(&rsp);
140✔
1193
  return code;
140✔
1194
}
1195

1196
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
419✔
1197
  int32_t                 code = 0;
419✔
1198
  int32_t                 lino = 0;
419✔
1199
  SArray*                 schemas = NULL;
419✔
1200
  SStreamReaderTaskInner* pTaskInner = NULL;
419✔
1201
  SStreamTsResponse       lastTsRsp = {0};
419✔
1202
  void*                   buf = NULL;
419✔
1203
  size_t                  size = 0;
419✔
1204

1205
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
419!
1206
  void* pTask = sStreamReaderInfo->pTask;
419✔
1207

1208
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
419✔
1209

1210
  SStreamTriggerReaderTaskInnerOptions options = {0};
419✔
1211
  STREAM_CHECK_RET_GOTO(createOptionsForLastTs(&options, sStreamReaderInfo));
419!
1212
  SStorageAPI api = {0};
425✔
1213
  initStorageAPI(&api);
425✔
1214
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
425!
1215

1216
  lastTsRsp.ver = pVnode->state.applied;
425✔
1217
  if (sStreamReaderInfo->uidList != NULL) {
425✔
1218
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner, -1));
137!
1219
  } else {
1220
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
288!
1221
  }
1222
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
425✔
1223
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
425!
1224

1225
end:
423✔
1226
  STREAM_PRINT_LOG_END_WITHID(code, lino);
423!
1227
  SRpcMsg rsp = {
423✔
1228
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1229
  tmsgSendRsp(&rsp);
423✔
1230
  taosArrayDestroy(lastTsRsp.tsInfo);
425✔
1231
  releaseStreamTask(&pTaskInner);
425✔
1232
  return code;
422✔
1233
}
1234

1235
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
390✔
1236
  int32_t                 code = 0;
390✔
1237
  int32_t                 lino = 0;
390✔
1238
  SStreamReaderTaskInner* pTaskInner = NULL;
390✔
1239
  SStreamTsResponse       firstTsRsp = {0};
390✔
1240
  void*                   buf = NULL;
390✔
1241
  size_t                  size = 0;
390✔
1242

1243
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
390!
1244
  void* pTask = sStreamReaderInfo->pTask;
390✔
1245
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
390✔
1246
  SStreamTriggerReaderTaskInnerOptions options = {0};
390✔
1247
  STREAM_CHECK_RET_GOTO(createOptionsForFirstTs(&options, sStreamReaderInfo, req->firstTsReq.startTime, req->firstTsReq.ver));
390!
1248
  SStorageAPI api = {0};
390✔
1249
  initStorageAPI(&api);
390✔
1250
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
390!
1251
  
1252
  firstTsRsp.ver = pVnode->state.applied;
390✔
1253
  if (sStreamReaderInfo->uidList != NULL) {
390✔
1254
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.ver));
51!
1255
  } else {
1256
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
339!
1257
  }
1258

1259
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
390✔
1260
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
390!
1261

1262
end:
390✔
1263
  STREAM_PRINT_LOG_END_WITHID(code, lino);
390!
1264
  SRpcMsg rsp = {
390✔
1265
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1266
  tmsgSendRsp(&rsp);
390✔
1267
  taosArrayDestroy(firstTsRsp.tsInfo);
390✔
1268
  releaseStreamTask(&pTaskInner);
390✔
1269
  return code;
390✔
1270
}
1271

1272
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
626✔
1273
  int32_t code = 0;
626✔
1274
  int32_t lino = 0;
626✔
1275
  void*   buf = NULL;
626✔
1276
  size_t  size = 0;
626✔
1277
  SStreamTriggerReaderTaskInnerOptions options = {0};
626✔
1278

1279
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
626!
1280
  void* pTask = sStreamReaderInfo->pTask;
626✔
1281
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
626✔
1282

1283
  SStreamReaderTaskInner* pTaskInner = NULL;
626✔
1284
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
626✔
1285

1286
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
626!
1287
    SStreamTriggerReaderTaskInnerOptions optionsTs = {0};
626✔
1288

1289
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&optionsTs, sStreamReaderInfo, req->tsdbMetaReq.startTime,
634!
1290
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, true));
1291
    SStorageAPI api = {0};
626✔
1292
    initStorageAPI(&api);
626✔
1293
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &optionsTs, &pTaskInner, NULL, sStreamReaderInfo->groupIdMap, &api));
626✔
1294
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
618!
1295
    
1296
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&options, sStreamReaderInfo, req->tsdbMetaReq.startTime,
618!
1297
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, false));
1298
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(options.schemas, &pTaskInner->pResBlockDst));
618!
1299
  } else {
1300
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1301
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1302
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1303
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1304
  }
1305

1306
  blockDataCleanup(pTaskInner->pResBlockDst);
618✔
1307
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
618!
1308
  bool hasNext = true;
618✔
1309
  while (true) {
208✔
1310
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
826✔
1311
    if (!hasNext) {
820✔
1312
      break;
612✔
1313
    }
1314
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
208✔
1315
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
208✔
1316

1317
    int32_t index = 0;
208✔
1318
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
208!
1319
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
208!
1320
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
208!
1321
    if (sStreamReaderInfo->uidList == NULL) {
208✔
1322
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
40!
1323
    }
1324
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
208!
1325

1326
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
208✔
1327
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1328
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1329
            pTaskInner->pResBlockDst->info.rows++;
208✔
1330
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
208!
1331
      break;
×
1332
    }
1333
  }
1334

1335
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
612✔
1336
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
612!
1337
  if (!hasNext) {
612!
1338
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
612!
1339
  }
1340

1341
end:
612✔
1342
  STREAM_PRINT_LOG_END_WITHID(code, lino);
626!
1343
  SRpcMsg rsp = {
626✔
1344
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1345
  tmsgSendRsp(&rsp);
626✔
1346
  taosArrayDestroy(options.schemas);
626✔
1347
  return code;
626✔
1348
}
1349

1350
static int32_t vnodeProcessStreamTsdbTsDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
41✔
1351
  int32_t                 code = 0;
41✔
1352
  int32_t                 lino = 0;
41✔
1353
  SStreamReaderTaskInner* pTaskInner = NULL;
41✔
1354
  void*                   buf = NULL;
41✔
1355
  size_t                  size = 0;
41✔
1356
  SSDataBlock*            pBlockRes = NULL;
41✔
1357

1358
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
41!
1359
  void* pTask = sStreamReaderInfo->pTask;
41✔
1360
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
41!
1361

1362
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
41✔
1363
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
1364
  reSetUid(&options, req->tsdbTsDataReq.suid, req->tsdbTsDataReq.uid);
41✔
1365
  SStorageAPI api = {0};
41✔
1366
  initStorageAPI(&api);
41✔
1367
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
41!
1368
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
41!
1369
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
41!
1370

1371
  while (1) {
41✔
1372
    bool hasNext = false;
82✔
1373
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
82!
1374
    if (!hasNext) {
82✔
1375
      break;
41✔
1376
    }
1377
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
41✔
1378

1379
    SSDataBlock* pBlock = NULL;
41✔
1380
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
41!
1381
    if (pBlock != NULL && pBlock->info.rows > 0) {
41!
1382
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &api, pBlock));
41!
1383
    }
1384
    
1385
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
41!
1386
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
41!
1387
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
41!
1388
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1389
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1390
  }
1391

1392
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
41✔
1393

1394
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
41!
1395
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
41!
1396

1397
end:
41✔
1398
  STREAM_PRINT_LOG_END_WITHID(code, lino);
41!
1399
  SRpcMsg rsp = {
41✔
1400
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1401
  tmsgSendRsp(&rsp);
41✔
1402
  blockDataDestroy(pBlockRes);
41✔
1403

1404
  releaseStreamTask(&pTaskInner);
41✔
1405
  return code;
41✔
1406
}
1407

1408
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
340✔
1409
  int32_t code = 0;
340✔
1410
  int32_t lino = 0;
340✔
1411
  void*   buf = NULL;
340✔
1412
  size_t  size = 0;
340✔
1413

1414
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
340!
1415
  SStreamReaderTaskInner* pTaskInner = NULL;
340✔
1416
  void* pTask = sStreamReaderInfo->pTask;
340✔
1417
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
340✔
1418
  
1419
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
340✔
1420

1421
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
340✔
1422
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, true, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
164✔
1423
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
1424
                 req->tsdbTriggerDataReq.gid, true, NULL);
1425
    SStorageAPI api = {0};
164✔
1426
    initStorageAPI(&api);
164✔
1427
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock,
164!
1428
                                           sStreamReaderInfo->groupIdMap, &api));
1429
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
164!
1430
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
164!
1431
  } else {
1432
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
176✔
1433
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
176!
1434
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
176✔
1435
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
176!
1436
  }
1437

1438
  blockDataCleanup(pTaskInner->pResBlockDst);
340✔
1439
  bool hasNext = true;
340✔
1440
  while (1) {
×
1441
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
340!
1442
    if (!hasNext) {
340✔
1443
      break;
164✔
1444
    }
1445
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
176✔
1446
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
176✔
1447

1448
    SSDataBlock* pBlock = NULL;
176✔
1449
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
176!
1450
    if (pBlock != NULL && pBlock->info.rows > 0) {
176!
1451
      STREAM_CHECK_RET_GOTO(
176!
1452
        processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &pTaskInner->api, pBlock));
1453
    }
1454
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
176!
1455
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
176!
1456
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
176✔
1457
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1458
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1459
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
176!
1460
      break;
176✔
1461
    }
1462
  }
1463

1464
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
340!
1465
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
340✔
1466
  if (!hasNext) {
340✔
1467
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
164!
1468
  }
1469

1470
end:
340✔
1471
  STREAM_PRINT_LOG_END_WITHID(code, lino);
340!
1472
  SRpcMsg rsp = {
340✔
1473
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1474
  tmsgSendRsp(&rsp);
340✔
1475

1476
  return code;
340✔
1477
}
1478

1479
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
28,826✔
1480
  int32_t code = 0;
28,826✔
1481
  int32_t lino = 0;
28,826✔
1482
  void*   buf = NULL;
28,826✔
1483
  size_t  size = 0;
28,826✔
1484
  SSDataBlock*            pBlockRes = NULL;
28,826✔
1485

1486
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
28,826!
1487
  void* pTask = sStreamReaderInfo->pTask;
28,826✔
1488
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, 
28,826✔
1489
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid);
1490

1491
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
28,826!
1492

1493
  SStreamReaderTaskInner* pTaskInner = NULL;
28,826✔
1494
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
28,826✔
1495

1496
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
28,825!
1497
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
28,825✔
1498
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
1499
    SStorageAPI api = {0};
28,825✔
1500
    initStorageAPI(&api);
28,825✔
1501
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
28,824✔
1502

1503
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
28,502!
1504
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
28,502!
1505
  } else {
1506
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1507
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1508
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1509
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1510
  }
1511

1512
  blockDataCleanup(pTaskInner->pResBlockDst);
28,502✔
1513
  bool hasNext = true;
28,502✔
1514
  while (1) {
2,810✔
1515
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
31,312!
1516
    if (!hasNext) {
31,308✔
1517
      break;
28,500✔
1518
    }
1519
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
2,808✔
1520

1521
    SSDataBlock* pBlock = NULL;
2,810✔
1522
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
2,810!
1523
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
2,809!
1524
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
2,809!
1525
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
2,810!
1526
      break;
×
1527
    }
1528
  }
1529

1530
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
28,500!
1531
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
28,499!
1532
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
28,499✔
1533
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
28,501!
1534
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
28,499✔
1535
  if (!hasNext) {
28,502!
1536
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
28,502!
1537
  }
1538

1539
end:
28,502✔
1540
  STREAM_PRINT_LOG_END_WITHID(code, lino);
28,826!
1541
  SRpcMsg rsp = {
28,827✔
1542
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1543
  tmsgSendRsp(&rsp);
28,827✔
1544
  blockDataDestroy(pBlockRes);
28,825✔
1545
  return code;
28,826✔
1546
}
1547

1548
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
353✔
1549
  int32_t code = 0;
353✔
1550
  int32_t lino = 0;
353✔
1551
  void*   buf = NULL;
353✔
1552
  size_t  size = 0;
353✔
1553

1554
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
353!
1555
  void* pTask = sStreamReaderInfo->pTask;
353✔
1556
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
353✔
1557

1558
  SStreamReaderTaskInner* pTaskInner = NULL;
353✔
1559
  int64_t key = req->tsdbDataReq.uid;
353✔
1560

1561
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
353!
1562
    SStreamTriggerReaderTaskInnerOptions options = {0};
353✔
1563

1564
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbData(pVnode, &options, sStreamReaderInfo, req->tsdbDataReq.uid,
353!
1565
                                                   req->tsdbDataReq.cids, req->tsdbDataReq.order, req->tsdbDataReq.skey,
1566
                                                   req->tsdbDataReq.ekey, req->tsdbDataReq.ver));
1567
    reSetUid(&options, req->tsdbDataReq.suid, req->tsdbDataReq.uid);
353✔
1568

1569
    SStorageAPI api = {0};
353✔
1570
    initStorageAPI(&api);
353✔
1571
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
353!
1572

1573
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
353✔
1574
    cleanupQueryTableDataCond(&pTaskInner->cond);
353✔
1575
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
353!
1576
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
1577
                                                        pTaskInner->options.suid, pTaskInner->options.ver));
1578
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
353!
1579
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
1580

1581
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
353!
1582
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
353!
1583
  } else {
1584
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1585
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1586
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1587
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1588
  }
1589

1590
  blockDataCleanup(pTaskInner->pResBlockDst);
353✔
1591
  bool hasNext = true;
353✔
1592
  while (1) {
354✔
1593
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
707!
1594
    if (!hasNext) {
707✔
1595
      break;
353✔
1596
    }
1597

1598
    SSDataBlock* pBlock = NULL;
354✔
1599
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
354!
1600
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
354!
1601
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
354!
1602
      break;
×
1603
    }
1604
  }
1605
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
353!
1606
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
353✔
1607
  if (!hasNext) {
353!
1608
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
353!
1609
  }
1610

1611
end:
353✔
1612
  STREAM_PRINT_LOG_END_WITHID(code, lino);
353!
1613
  SRpcMsg rsp = {
353✔
1614
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1615
  tmsgSendRsp(&rsp);
353✔
1616

1617
  return code;
353✔
1618
}
1619

1620
static int32_t vnodeProcessStreamWalMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
10,487✔
1621
  int32_t      code = 0;
10,487✔
1622
  int32_t      lino = 0;
10,487✔
1623
  void*        buf = NULL;
10,487✔
1624
  size_t       size = 0;
10,487✔
1625
  SSDataBlock* pBlock = NULL;
10,487✔
1626
  void*        pTableList = NULL;
10,487✔
1627
  SNodeList*   groupNew = NULL;
10,487✔
1628
  int64_t      lastVer = 0;
10,487✔
1629

1630
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
10,487✔
1631
  void* pTask = sStreamReaderInfo->pTask;
10,461✔
1632
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64 ",ctime:%" PRId64, TD_VID(pVnode), __func__,
10,461✔
1633
  req->walMetaReq.lastVer, req->walMetaReq.ctime);
1634

1635
  bool isVTable = sStreamReaderInfo->uidList != NULL;
10,461✔
1636
  if (sStreamReaderInfo->uidList == NULL) {
10,461✔
1637
    SStorageAPI api = {0};
7,264✔
1638
    initStorageAPI(&api);
7,264✔
1639
    STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
7,269!
1640
    STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
7,272!
1641
        pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1642
        groupNew, false, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1643
        &pTableList, sStreamReaderInfo->groupIdMap));
1644
  }
1645

1646
  STREAM_CHECK_RET_GOTO(createBlockForWalMeta(&pBlock, isVTable));
10,452!
1647
  STREAM_CHECK_RET_GOTO(scanWal(pVnode, isVTable ? sStreamReaderInfo->uidHash : pTableList, isVTable, pBlock,
10,462!
1648
                                req->walMetaReq.lastVer, sStreamReaderInfo->deleteReCalc,
1649
                                sStreamReaderInfo->deleteOutTbl, req->walMetaReq.ctime, &lastVer));
1650

1651
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
10,476✔
1652
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
10,476!
1653
  printDataBlock(pBlock, __func__, "");
10,469✔
1654

1655
end:
10,490✔
1656
  if (pBlock != NULL && pBlock->info.rows == 0) {
10,490✔
1657
    code = TSDB_CODE_STREAM_NO_DATA;
9,918✔
1658
    buf = rpcMallocCont(sizeof(int64_t));
9,918✔
1659
    *(int64_t *)buf = lastVer;
9,919✔
1660
    size = sizeof(int64_t);
9,919✔
1661
  }
1662
  SRpcMsg rsp = {
10,491✔
1663
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1664
  tmsgSendRsp(&rsp);
10,491✔
1665
  if (code == TSDB_CODE_STREAM_NO_DATA){
10,502✔
1666
    code = 0;
9,925✔
1667
  }
1668
  STREAM_PRINT_LOG_END_WITHID(code, lino);
10,502!
1669
  nodesDestroyList(groupNew);
10,502✔
1670
  blockDataDestroy(pBlock);
10,503✔
1671
  qStreamDestroyTableList(pTableList);
10,501✔
1672

1673
  return code;
10,502✔
1674
}
1675

1676
static int32_t vnodeProcessStreamWalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
16,116✔
1677
  int32_t      code = 0;
16,116✔
1678
  int32_t      lino = 0;
16,116✔
1679
  void*        buf = NULL;
16,116✔
1680
  size_t       size = 0;
16,116✔
1681
  SSDataBlock* pBlock = NULL;
16,116✔
1682

1683
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
16,116!
1684
  void* pTask = sStreamReaderInfo->pTask;
16,116✔
1685
  ST_TASK_DLOG("vgId:%d %s start, request type:%d skey:%" PRId64 ",ekey:%" PRId64 ",uid:%" PRId64 ",ver:%" PRId64, TD_VID(pVnode),
16,116✔
1686
  __func__, req->walDataReq.base.type, req->walDataReq.skey, req->walDataReq.ekey, req->walDataReq.uid, req->walDataReq.ver);
1687

1688
  STimeWindow window = {.skey = req->walDataReq.skey, .ekey = req->walDataReq.ekey};
16,118✔
1689
  if (req->walDataReq.base.type == STRIGGER_PULL_WAL_DATA){
16,118✔
1690
    STREAM_CHECK_RET_GOTO(processWalVerDataVTable(pVnode, req->walDataReq.cids, req->walDataReq.ver,
10,450!
1691
      req->walDataReq.uid, &window, &pBlock));
1692
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_CALC_DATA){
5,668✔
1693
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
3,013!
1694
      req->walDataReq.uid, &window, sStreamReaderInfo->calcResBlock, &pBlock));
1695
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TRIGGER_DATA) {
2,655✔
1696
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, true,
1,951!
1697
      req->walDataReq.uid, &window, sStreamReaderInfo->triggerResBlock, &pBlock));
1698
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TS_DATA){
704!
1699
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
706!
1700
      req->walDataReq.uid, &window, sStreamReaderInfo->tsBlock, &pBlock));
1701

1702
  }
1703
  
1704
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
16,116✔
1705
  printDataBlock(pBlock, __func__, "");
16,116✔
1706
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
16,118!
1707
end:
16,108✔
1708
  STREAM_PRINT_LOG_END_WITHID(code, lino);
16,108!
1709
  SRpcMsg rsp = {
16,108✔
1710
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1711
  tmsgSendRsp(&rsp);
16,108✔
1712

1713
  blockDataDestroy(pBlock);
16,120✔
1714
  return code;
16,122✔
1715
}
1716

1717
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
675✔
1718
  int32_t code = 0;
675✔
1719
  int32_t lino = 0;
675✔
1720
  void*   buf = NULL;
675✔
1721
  size_t  size = 0;
675✔
1722

1723
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
675!
1724
  void* pTask = sStreamReaderInfo->pTask;
675✔
1725
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
675✔
1726

1727
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
675✔
1728
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
675!
1729
  SStreamGroupInfo pGroupInfo = {0};
675✔
1730
  pGroupInfo.gInfo = *gInfo;
675✔
1731

1732
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
675✔
1733
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1734
  buf = rpcMallocCont(size);
675✔
1735
  STREAM_CHECK_NULL_GOTO(buf, terrno);
675!
1736
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
675✔
1737
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1738
end:
675✔
1739
  if (code != 0) {
675!
1740
    rpcFreeCont(buf);
×
1741
    buf = NULL;
×
1742
    size = 0;
×
1743
  }
1744
  STREAM_PRINT_LOG_END_WITHID(code, lino);
675!
1745
  SRpcMsg rsp = {
675✔
1746
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1747
  tmsgSendRsp(&rsp);
675✔
1748

1749
  return code;
675✔
1750
}
1751

1752
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
547✔
1753
  int32_t              code = 0;
547✔
1754
  int32_t              lino = 0;
547✔
1755
  void*                buf = NULL;
547✔
1756
  size_t               size = 0;
547✔
1757
  SStreamMsgVTableInfo vTableInfo = {0};
547✔
1758
  SMetaReader          metaReader = {0};
547✔
1759
  SNodeList*           groupNew = NULL;
547✔
1760
  void*                pTableList = NULL;
547✔
1761
  SStorageAPI api = {0};
547✔
1762
  initStorageAPI(&api);
547✔
1763

1764
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
548!
1765
  void* pTask = sStreamReaderInfo->pTask;
548✔
1766
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
548✔
1767

1768
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
548!
1769
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
548!
1770
      pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1771
      groupNew, true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1772
      &pTableList, sStreamReaderInfo->groupIdMap));
1773

1774
  SArray* cids = req->virTableInfoReq.cids;
548✔
1775
  STREAM_CHECK_NULL_GOTO(cids, terrno);
548!
1776

1777
  SArray* pTableListArray = qStreamGetTableArrayList(pTableList);
548✔
1778
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
548!
1779

1780
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
548✔
1781
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
548!
1782
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
548✔
1783

1784
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
1,759✔
1785
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
1,213✔
1786
    if (pKeyInfo == NULL) {
1,213!
1787
      continue;
×
1788
    }
1789
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
1,213✔
1790
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
1,213!
1791
    vTable->uid = pKeyInfo->uid;
1,213✔
1792
    vTable->gId = pKeyInfo->groupId;
1,213✔
1793

1794
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
1,213✔
1795
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
1,212!
1796
      vTable->cols.nCols = metaReader.me.colRef.nCols;
×
1797
      vTable->cols.version = metaReader.me.colRef.version;
×
1798
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
×
1799
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
×
1800
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
×
1801
      }
1802
    } else {
1803
      vTable->cols.nCols = taosArrayGetSize(cids);
1,212✔
1804
      vTable->cols.version = metaReader.me.colRef.version;
1,212✔
1805
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
1,212!
1806
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
7,122✔
1807
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
24,579✔
1808
          if (metaReader.me.colRef.pColRef[j].hasRef &&
23,308✔
1809
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
17,244✔
1810
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
4,641✔
1811
            break;
4,641✔
1812
          }
1813
        }
1814
      }
1815
    }
1816
    tDecoderClear(&metaReader.coder);
1,206✔
1817
  }
1818
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
546✔
1819
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
546!
1820

1821
end:
547✔
1822
  nodesDestroyList(groupNew);
547✔
1823
  qStreamDestroyTableList(pTableList);
548✔
1824
  tDestroySStreamMsgVTableInfo(&vTableInfo);
548✔
1825
  api.metaReaderFn.clearReader(&metaReader);
548✔
1826
  STREAM_PRINT_LOG_END_WITHID(code, lino);
548!
1827
  SRpcMsg rsp = {
548✔
1828
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1829
  tmsgSendRsp(&rsp);
548✔
1830
  return code;
547✔
1831
}
1832

1833
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
140✔
1834
  int32_t                   code = 0;
140✔
1835
  int32_t                   lino = 0;
140✔
1836
  void*                     buf = NULL;
140✔
1837
  size_t                    size = 0;
140✔
1838
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
140✔
1839
  SMetaReader               metaReader = {0};
140✔
1840
  int64_t streamId = req->base.streamId;
140✔
1841
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
140✔
1842

1843
  SStorageAPI api = {0};
140✔
1844
  initStorageAPI(&api);
140✔
1845

1846
  SArray* cols = req->origTableInfoReq.cols;
140✔
1847
  STREAM_CHECK_NULL_GOTO(cols, terrno);
140!
1848

1849
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
140✔
1850

1851
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
140!
1852

1853
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
140✔
1854
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1,319✔
1855
    OTableInfo*    oInfo = taosArrayGet(cols, i);
1,180✔
1856
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
1,180✔
1857
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
1,179!
1858
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
1,179!
1859
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
1,179!
1860
    vTableInfo->uid = metaReader.me.uid;
1,178✔
1861
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
1,178✔
1862

1863
    SSchemaWrapper* sSchemaWrapper = NULL;
1,178✔
1864
    if (metaReader.me.type == TD_CHILD_TABLE) {
1,178✔
1865
      int64_t suid = metaReader.me.ctbEntry.suid;
1,161✔
1866
      vTableInfo->suid = suid;
1,161✔
1867
      tDecoderClear(&metaReader.coder);
1,161✔
1868
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
1,163!
1869
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
1,163✔
1870
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
17!
1871
      vTableInfo->suid = 0;
17✔
1872
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
17✔
1873
    } else {
1874
      stError("invalid table type:%d", metaReader.me.type);
×
1875
    }
1876

1877
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
4,365!
1878
      SSchema* s = sSchemaWrapper->pSchema + j;
4,365✔
1879
      if (strcmp(s->name, oInfo->refColName) == 0) {
4,365✔
1880
        vTableInfo->cid = s->colId;
1,179✔
1881
        break;
1,179✔
1882
      }
1883
    }
1884
    if (vTableInfo->cid == 0) {
1,179!
1885
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
1886
              oInfo->refTableName);
1887
    }
1888
    tDecoderClear(&metaReader.coder);
1,179✔
1889
  }
1890

1891
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
139!
1892

1893
end:
140✔
1894
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
140✔
1895
  api.metaReaderFn.clearReader(&metaReader);
139✔
1896
  STREAM_PRINT_LOG_END(code, lino);
140!
1897
  SRpcMsg rsp = {
140✔
1898
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1899
  tmsgSendRsp(&rsp);
140✔
1900
  return code;
140✔
1901
}
1902

1903
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
2,141✔
1904
  int32_t                   code = 0;
2,141✔
1905
  int32_t                   lino = 0;
2,141✔
1906
  void*                     buf = NULL;
2,141✔
1907
  size_t                    size = 0;
2,141✔
1908
  SSDataBlock* pBlock = NULL;
2,141✔
1909

1910
  SMetaReader               metaReader = {0};
2,141✔
1911
  SMetaReader               metaReaderStable = {0};
2,141✔
1912
  int64_t streamId = req->base.streamId;
2,141✔
1913
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
2,141✔
1914

1915
  SStorageAPI api = {0};
2,141✔
1916
  initStorageAPI(&api);
2,141✔
1917

1918
  SArray* cols = req->virTablePseudoColReq.cids;
2,141✔
1919
  STREAM_CHECK_NULL_GOTO(cols, terrno);
2,141!
1920

1921
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
2,141✔
1922
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
2,141!
1923

1924
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
2,141!
1925

1926
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
2,141!
1927
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
2,141✔
1928
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 && *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
19!
1929
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
19✔
1930
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
19!
1931
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
19!
1932
    pBlock->info.rows = 1;
19✔
1933
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
19✔
1934
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
19!
1935
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
19!
1936
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
2,122!
1937
    int64_t suid = metaReader.me.ctbEntry.suid;
2,122✔
1938
    api.metaReaderFn.readerReleaseLock(&metaReader);
2,122✔
1939
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
2,122✔
1940

1941
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
2,122!
1942
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
2,122✔
1943
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
14,887✔
1944
      col_id_t* id = taosArrayGet(cols, i);
12,765✔
1945
      STREAM_CHECK_NULL_GOTO(id, terrno);
12,765!
1946
      if (*id == -1) {
12,765✔
1947
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
2,122✔
1948
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
2,122!
1949
        continue;
2,122✔
1950
      }
1951
      size_t j = 0;
10,643✔
1952
      for (; j < sSchemaWrapper->nCols; j++) {
38,127!
1953
        SSchema* s = sSchemaWrapper->pSchema + j;
38,127✔
1954
        if (s->colId == *id) {
38,127✔
1955
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
10,643✔
1956
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
10,643!
1957
          break;
10,643✔
1958
        }
1959
      }
1960
      if (j == sSchemaWrapper->nCols) {
10,643!
1961
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
1962
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
1963
      }
1964
    }
1965
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
2,122!
1966
    pBlock->info.rows = 1;
2,122✔
1967
    
1968
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
14,887✔
1969
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
12,765✔
1970
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
12,765!
1971

1972
      if (pDst->info.colId == -1) {
12,765✔
1973
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
2,122!
1974
        continue;
2,122✔
1975
      }
1976
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
10,643!
1977
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
1978
        continue;
×
1979
      }
1980

1981
      STagVal val = {0};
10,643✔
1982
      val.cid = pDst->info.colId;
10,643✔
1983
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
10,643✔
1984

1985
      char* data = NULL;
10,643✔
1986
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
10,643!
1987
        data = tTagValToData((const STagVal*)p, false);
9,061✔
1988
      } else {
1989
        data = (char*)p;
1,582✔
1990
      }
1991

1992
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
10,643!
1993
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
1994

1995
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
10,643!
1996
          (data != NULL)) {
1997
        taosMemoryFree(data);
5,601!
1998
      }
1999
    }
2000
  } else {
2001
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
2002
    code = TSDB_CODE_INVALID_PARA;
×
2003
    goto end;
×
2004
  }
2005
  
2006
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
2,141✔
2007
  printDataBlock(pBlock, __func__, "");
2,141✔
2008
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
2,141!
2009

2010
end:
2,141✔
2011
  api.metaReaderFn.clearReader(&metaReaderStable);
2,141✔
2012
  api.metaReaderFn.clearReader(&metaReader);
2,141✔
2013
  STREAM_PRINT_LOG_END(code, lino);
2,141!
2014
  SRpcMsg rsp = {
2,141✔
2015
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2016
  tmsgSendRsp(&rsp);
2,141✔
2017
  blockDataDestroy(pBlock);
2,141✔
2018
  return code;
2,141✔
2019
}
2020

2021
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
15,267✔
2022
  int32_t            code = 0;
15,267✔
2023
  int32_t            lino = 0;
15,267✔
2024
  void*              buf = NULL;
15,267✔
2025
  size_t             size = 0;
15,267✔
2026
  void*              taskAddr = NULL;
15,267✔
2027
  SArray*            pResList = NULL;
15,267✔
2028

2029
  SResFetchReq req = {0};
15,267✔
2030
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
15,267!
2031
                              TSDB_CODE_QRY_INVALID_INPUT);
2032
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
15,267✔
2033
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
15,267!
2034

2035
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
15,267!
2036
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
15,267✔
2037
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
15,267!
2038
  void* pTask = sStreamReaderCalcInfo->pTask;
15,267✔
2039
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
15,267✔
2040
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
2041

2042
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
15,267!
2043
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
15,201✔
2044
    int64_t uid = 0;
15,201✔
2045
    if (req.dynTbname) {
15,201✔
2046
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
6✔
2047
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
6!
2048
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
6✔
2049
        if (pValue != NULL && pValue->isTbname) {
6!
2050
          uid = pValue->uid;
6✔
2051
          break;
6✔
2052
        }
2053
      }
2054
    }
2055
    
2056
    SReadHandle handle = {0};
15,201✔
2057
    handle.vnode = pVnode;
15,201✔
2058
    handle.uid = uid;
15,201✔
2059

2060
    initStorageAPI(&handle.api);
15,201✔
2061
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
15,201✔
2062
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
14,789✔
2063
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
15,141✔
2064
      if (node != NULL) {
15,141✔
2065
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle));
15,121!
2066
      }
2067
    }
2068

2069
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
15,201✔
2070
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
15,201✔
2071

2072
    // if (sStreamReaderCalcInfo->pTaskInfo == NULL) {
2073
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
15,201✔
2074
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
2075
                                                    req.taskId));
2076
    // } else {
2077
    // STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, handle.winRange));
2078
    // }
2079

2080
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
15,198!
2081
  }
2082

2083
  if (req.pOpParam != NULL) {
15,264✔
2084
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
48✔
2085
  }
2086
  
2087
  pResList = taosArrayInit(4, POINTER_BYTES);
15,264✔
2088
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
15,264!
2089
  uint64_t ts = 0;
15,264✔
2090
  bool     hasNext = false;
15,264✔
2091
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
15,264✔
2092

2093
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
24,465✔
2094
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
9,206✔
2095
    if (pBlock == NULL) continue;
9,206!
2096
    printDataBlock(pBlock, __func__, "fetch");
9,206✔
2097
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
9,206✔
2098
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo));
378!
2099
      printDataBlock(pBlock, __func__, "fetch filter");
378✔
2100
    }
2101
  }
2102

2103
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
15,259!
2104
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
15,259✔
2105

2106
end:
124✔
2107
  taosArrayDestroy(pResList);
15,267✔
2108
  streamReleaseTask(taskAddr);
15,267✔
2109

2110
  STREAM_PRINT_LOG_END(code, lino);
15,267!
2111
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
15,267✔
2112
  tmsgSendRsp(&rsp);
15,267✔
2113
  tDestroySResFetchReq(&req);
15,267✔
2114
  return code;
15,267✔
2115
}
2116

2117
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
76,586✔
2118
  int32_t                   code = 0;
76,586✔
2119
  int32_t                   lino = 0;
76,586✔
2120
  SSTriggerPullRequestUnion req = {0};
76,586✔
2121
  void*                     taskAddr = NULL;
76,586✔
2122

2123
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
76,586✔
2124
  if (!syncIsReadyForRead(pVnode->sync)) {
76,591✔
2125
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
79✔
2126
    return 0;
75✔
2127
  }
2128

2129
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
76,526✔
2130
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
15,267✔
2131
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
61,259!
2132
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
61,261✔
2133
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
61,261✔
2134
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
61,261!
2135
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
61,257✔
2136
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
2137
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
61,254✔
2138
    switch (req.base.type) {
61,249!
2139
      case STRIGGER_PULL_SET_TABLE:
140✔
2140
        code = vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo);
140✔
2141
        break;
140✔
2142
      case STRIGGER_PULL_LAST_TS:
425✔
2143
        code = vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
425✔
2144
        break;
421✔
2145
      case STRIGGER_PULL_FIRST_TS:
390✔
2146
        code = vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
390✔
2147
        break;
390✔
2148
      case STRIGGER_PULL_TSDB_META:
626✔
2149
      case STRIGGER_PULL_TSDB_META_NEXT:
2150
        code = vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
626✔
2151
        break;
626✔
2152
      case STRIGGER_PULL_TSDB_TS_DATA:
41✔
2153
        code = vnodeProcessStreamTsdbTsDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
41✔
2154
        break;
41✔
2155
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
340✔
2156
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
2157
        code = vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
340✔
2158
        break;
340✔
2159
      case STRIGGER_PULL_TSDB_CALC_DATA:
28,826✔
2160
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
2161
        code = vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
28,826✔
2162
        break;
28,825✔
2163
      case STRIGGER_PULL_TSDB_DATA:
353✔
2164
      case STRIGGER_PULL_TSDB_DATA_NEXT:
2165
        code = vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
353✔
2166
        break;
353✔
2167
      case STRIGGER_PULL_WAL_META:
10,489✔
2168
        code = vnodeProcessStreamWalMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
10,489✔
2169
        break;
10,496✔
2170
      case STRIGGER_PULL_WAL_TS_DATA:
16,116✔
2171
      case STRIGGER_PULL_WAL_TRIGGER_DATA:
2172
      case STRIGGER_PULL_WAL_CALC_DATA:
2173
      case STRIGGER_PULL_WAL_DATA:
2174
        code = vnodeProcessStreamWalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
16,116✔
2175
        break;
16,120✔
2176
      case STRIGGER_PULL_GROUP_COL_VALUE:
675✔
2177
        code = vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo);
675✔
2178
        break;
675✔
2179
      case STRIGGER_PULL_VTABLE_INFO:
547✔
2180
        code = vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo);
547✔
2181
        break;
547✔
2182
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
2,141✔
2183
        code = vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req);
2,141✔
2184
        break;
2,141✔
2185
      case STRIGGER_PULL_OTABLE_INFO:
140✔
2186
        code = vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req);
140✔
2187
        break;
140✔
2188
      default:
×
2189
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
2190
        code = TSDB_CODE_APP_ERROR;
×
2191
        break;
×
2192
    }
2193
  } else {
2194
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
2195
    code = TSDB_CODE_APP_ERROR;
×
2196
  }
2197
end:
61,255✔
2198

2199
  streamReleaseTask(taskAddr);
61,255✔
2200

2201
  tDestroySTriggerPullRequest(&req);
61,268✔
2202
  STREAM_PRINT_LOG_END(code, lino);
61,263!
2203
  return code;
61,256✔
2204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc