• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4674

18 Aug 2025 07:58AM UTC coverage: 59.821% (+0.1%) from 59.715%
#4674

push

travis-ci

web-flow
test: update case desc (#32551)

136937 of 292075 branches covered (46.88%)

Branch coverage included in aggregate %.

207916 of 284395 relevant lines covered (73.11%)

4553289.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.55
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "scalar.h"
17
#include "streamReader.h"
18
#include "tdatablock.h"
19
#include "tdb.h"
20
#include "tencode.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23
#include "vnd.h"
24
#include "vnode.h"
25
#include "vnodeInt.h"
26

27
#define BUILD_OPTION(options, sStreamInfo, _ver, _groupSort, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
28
                     _gid, _initReader, _uidList)                                                                        \
29
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_uidList == NULL ? sStreamInfo->suid : 0),              \
30
                                                  .uid = sStreamInfo->uid,                                         \
31
                                                  .ver = _ver,                                                     \
32
                                                  .tableType = sStreamInfo->tableType,                             \
33
                                                  .groupSort = _groupSort,                                         \
34
                                                  .order = _order,                                                 \
35
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
36
                                                  .pTagCond = sStreamInfo->pTagCond,                               \
37
                                                  .pTagIndexCond = sStreamInfo->pTagIndexCond,                     \
38
                                                  .pConditions = sStreamInfo->pConditions,                         \
39
                                                  .partitionCols = sStreamInfo->partitionCols,                     \
40
                                                  .schemas = _schemas,                                             \
41
                                                  .isSchema = _isSchema,                                           \
42
                                                  .scanMode = _scanMode,                                           \
43
                                                  .gid = _gid,                                                     \
44
                                                  .initReader = _initReader,                                       \
45
                                                  .uidList = _uidList};
46

47
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
29,065✔
48

49
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
50

51
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
31,278✔
52
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
31,278✔
53
  if (pSrc == NULL) {
31,290✔
54
    return terrno;
6✔
55
  }
56

57
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
31,284✔
58
  return 0;
31,284✔
59
}
60

61
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
33,267✔
62
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
33,267✔
63
  if (code != TSDB_CODE_SUCCESS) {
33,266!
64
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
65
  }
66

67
  return code;
33,267✔
68
}
69

70
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
2,918✔
71
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
2,918✔
72
}
73

74
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
47✔
75
  int32_t code = 0;
47✔
76
  int32_t lino = 0;
47✔
77
  void*   buf = NULL;
47✔
78
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
47✔
79
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
47!
80
  buf = rpcMallocCont(len);
47✔
81
  STREAM_CHECK_NULL_GOTO(buf, terrno);
47!
82
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
47✔
83
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
47!
84
  *data = buf;
47✔
85
  *size = len;
47✔
86
  buf = NULL;
47✔
87
end:
47✔
88
  rpcFreeCont(buf);
47✔
89
  return code;
47✔
90
}
91

92
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
112✔
93
  int32_t code = 0;
112✔
94
  int32_t lino = 0;
112✔
95
  void*   buf = NULL;
112✔
96
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
112✔
97
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
112!
98
  buf = rpcMallocCont(len);
112✔
99
  STREAM_CHECK_NULL_GOTO(buf, terrno);
112!
100
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
112✔
101
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
112!
102
  *data = buf;
112✔
103
  *size = len;
112✔
104
  buf = NULL;
112✔
105
end:
112✔
106
  rpcFreeCont(buf);
112✔
107
  return code;
112✔
108
}
109

110
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
635✔
111
  int32_t code = 0;
635✔
112
  int32_t lino = 0;
635✔
113
  void*   buf = NULL;
635✔
114
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
635✔
115
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
635!
116
  buf = rpcMallocCont(len);
635✔
117
  STREAM_CHECK_NULL_GOTO(buf, terrno);
635!
118
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
635✔
119
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
635!
120
  *data = buf;
635✔
121
  *size = len;
635✔
122
  buf = NULL;
635✔
123
end:
635✔
124
  rpcFreeCont(buf);
635✔
125
  return code;
635✔
126
}
127

128

129
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
42,735✔
130
  int32_t code = 0;
42,735✔
131
  int32_t lino = 0;
42,735✔
132
  void*   buf = NULL;
42,735✔
133
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
42,735!
134
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
8,492✔
135
  buf = rpcMallocCont(dataEncodeSize);
8,496✔
136
  STREAM_CHECK_NULL_GOTO(buf, terrno);
8,490!
137
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
8,490✔
138
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
8,492!
139
  *data = buf;
8,492✔
140
  *size = dataEncodeSize;
8,492✔
141
  buf = NULL;
8,492✔
142
end:
42,735✔
143
  rpcFreeCont(buf);
42,735✔
144
  return code;
42,740✔
145
}
146

147
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
575✔
148
  int32_t        pNum = 1;
575✔
149
  STableKeyInfo* pList = NULL;
575✔
150
  int32_t        code = 0;
575✔
151
  int32_t        lino = 0;
575✔
152
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
575!
153
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
575!
154

155
  cleanupQueryTableDataCond(&pTask->cond);
575✔
156
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
575!
157
                                                      pTask->options.twindows, pTask->options.suid, pTask->options.ver));
158
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
575!
159

160
end:
575✔
161
  STREAM_PRINT_LOG_END(code, lino);
575!
162
  return code;
575✔
163
}
164

165
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t gid, bool isVTable, int64_t uid,
4,918✔
166
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
167
  int32_t code = 0;
4,918✔
168
  int32_t lino = 0;
4,918✔
169
  int32_t index = 0;
4,918✔
170
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
4,918!
171
  if (!isVTable) {
4,904✔
172
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &gid));
2,748!
173
  }
174
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
4,858!
175
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
4,843!
176
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
4,826!
177
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
4,823!
178
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
4,828!
179

180
end:
4,842✔
181
  STREAM_PRINT_LOG_END(code, lino)
4,842!
182
  return code;
4,843✔
183
}
184

185
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
4,942✔
186
  pTSchema->numOfCols = 1;
4,942✔
187
  pTSchema->version = ver;
4,942✔
188
  pTSchema->columns[0].colId = colId;
4,942✔
189
  pTSchema->columns[0].type = type;
4,942✔
190
  pTSchema->columns[0].bytes = bytes;
4,942✔
191
}
4,942✔
192

193
int32_t retrieveWalMetaData(SSubmitTbData* pSubmitTbData, void* pTableList, bool isVTable, SSDataBlock* pBlock,
40,396✔
194
                            int64_t ver) {
195
  int32_t code = 0;
40,396✔
196
  int32_t lino = 0;
40,396✔
197

198
  int64_t   uid = pSubmitTbData->uid;
40,396✔
199
  int32_t   numOfRows = 0;
40,396✔
200
  int64_t   skey = 0;
40,396✔
201
  int64_t   ekey = 0;
40,396✔
202
  STSchema* pTSchema = NULL;
40,396✔
203
  uint64_t  gid = 0;
40,396✔
204
  if (isVTable) {
40,396✔
205
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS);
6,883✔
206
  } else {
207
    STREAM_CHECK_CONDITION_GOTO(!qStreamUidInTableList(pTableList, uid), TDB_CODE_SUCCESS);
33,513✔
208
    gid = qStreamGetGroupId(pTableList, uid);
2,687✔
209
  }
210

211
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
4,896!
212
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
213
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
214
    numOfRows = pCol->nVal;
×
215

216
    SColVal colVal = {0};
×
217
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, 0, &colVal));
×
218
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
219

220
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, numOfRows - 1, &colVal));
×
221
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
222
  } else {
223
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
4,896✔
224
    SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, 0);
4,888✔
225
    STREAM_CHECK_NULL_GOTO(pRow, terrno);
4,843!
226
    SColVal colVal = {0};
4,843✔
227
    pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn));
4,843!
228
    STREAM_CHECK_NULL_GOTO(pTSchema, terrno);
4,949!
229
    buildTSchema(pTSchema, pSubmitTbData->sver, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES);
4,949✔
230
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
4,930!
231
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
4,907✔
232
    pRow = taosArrayGetP(pSubmitTbData->aRowP, numOfRows - 1);
4,907✔
233
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
4,862!
234
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
4,855✔
235
  }
236

237
  STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_SUBMIT_DATA, gid, isVTable, uid, skey, ekey, ver, numOfRows));
4,855!
238
  pBlock->info.rows++;
4,786✔
239
  stDebug("stream reader scan submit data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64 ", gid %" PRIu64
4,786✔
240
          ", rows:%d",
241
          uid, skey, ekey, gid, numOfRows);
242

243
end:
2,399✔
244
  taosMemoryFree(pTSchema);
40,469!
245
  STREAM_PRINT_LOG_END(code, lino)
40,598!
246
  return code;
40,598✔
247
}
248

249
int32_t retrieveWalData(SVnode* pVnode, SSubmitTbData* pSubmitTbData, SSDataBlock* pBlock, STimeWindow* window) {
6,233✔
250
  stDebug("stream reader retrieve data block %p", pSubmitTbData);
6,233✔
251
  int32_t code = 0;
6,233✔
252
  int32_t lino = 0;
6,233✔
253

254
  int32_t ver = pSubmitTbData->sver;
6,233✔
255
  int64_t uid = pSubmitTbData->uid;
6,233✔
256
  int32_t numOfRows = 0;
6,233✔
257

258
  STSchema* schemas = metaGetTbTSchema(pVnode->pMeta, uid, ver, 1);
6,233✔
259
  STREAM_CHECK_NULL_GOTO(schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
6,242!
260
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
6,242!
261
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
262
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
263
    int32_t rowStart = 0;
×
264
    int32_t rowEnd = pCol->nVal;
×
265
    if (window != NULL) {
×
266
      SColVal colVal = {0};
×
267
      rowStart = -1;
×
268
      rowEnd = -1;
×
269
      for (int32_t k = 0; k < pCol->nVal; k++) {
×
270
        STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
271
        int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
272
        if (ts >= window->skey && rowStart == -1) {
×
273
          rowStart = k;
×
274
        }
275
        if (ts > window->ekey && rowEnd == -1) {
×
276
          rowEnd = k;
×
277
        }
278
      }
279
      STREAM_CHECK_CONDITION_GOTO(rowStart == -1 || rowStart == rowEnd, TDB_CODE_SUCCESS);
×
280

281
      if (rowStart != -1 && rowEnd == -1) {
×
282
        rowEnd = pCol->nVal;
×
283
      }
284
    }
285
    numOfRows = rowEnd - rowStart;
×
286
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
×
287
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
288
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
289
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
290
      if (i >= taosArrayGetSize(pSubmitTbData->aCol)) {
×
291
        break;
×
292
      }
293
      for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aCol); j++) {
×
294
        SColData* pCol = taosArrayGet(pSubmitTbData->aCol, j);
×
295
        STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
296
        if (pCol->cid == pColData->info.colId) {
×
297
          for (int32_t k = rowStart; k < rowEnd; k++) {
×
298
            SColVal colVal = {0};
×
299
            STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
300
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
301
                                                !COL_VAL_IS_VALUE(&colVal)));
302
          }
303
        }
304
      }
305
    }
306
  } else {
307
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, taosArrayGetSize(pSubmitTbData->aRowP)));
6,242!
308
    for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aRowP); j++) {
521,023✔
309
      SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, j);
514,788✔
310
      STREAM_CHECK_NULL_GOTO(pRow, terrno);
514,790!
311
      SColVal colVal = {0};
514,790✔
312
      STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, 0, &colVal));
514,790!
313
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
514,793✔
314
      if (window != NULL && (ts < window->skey || ts > window->ekey)) {
514,793!
315
        continue;
44✔
316
      }
317
      for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
1,547,061✔
318
        if (i >= schemas->numOfCols) {
1,545,541✔
319
          break;
513,216✔
320
        }
321
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
1,032,325✔
322
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
1,032,322✔
323
        SColVal colVal = {0};
1,032,321✔
324
        int32_t sourceIdx = 0;
1,032,321✔
325
        while (1) {
326
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, sourceIdx, &colVal));
1,554,203!
327
          if (colVal.cid < pColData->info.colId) {
1,554,191✔
328
            sourceIdx++;
521,882✔
329
            continue;
521,882✔
330
          } else {
331
            break;
1,032,309✔
332
          }
333
        }
334
        if (colVal.cid == pColData->info.colId && COL_VAL_IS_VALUE(&colVal)) {
1,032,309!
335
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
1,031,416!
336
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, numOfRows, colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
26!
337
          } else {
338
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
1,031,390!
339
          }
340
        } else {
341
          colDataSetNULL(pColData, numOfRows);
893!
342
        }
343
      }
344
      numOfRows++;
514,736✔
345
    }
346
  }
347

348
  pBlock->info.rows = numOfRows;
6,244✔
349

350
end:
6,244✔
351
  taosMemoryFree(schemas);
6,244!
352
  STREAM_PRINT_LOG_END(code, lino)
6,244!
353
  return code;
6,245✔
354
}
355

356
static int32_t buildDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, SDeleteRes* req, int64_t uid, int64_t ver){
118✔
357
  int32_t    code = 0;
118✔
358
  int32_t    lino = 0;
118✔
359
  uint64_t   gid = 0;
118✔
360
  stDebug("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
118✔
361
  if (isVTable) {
118✔
362
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS)
20✔
363
  } else {
364
    gid = qStreamGetGroupId(pTableList, uid);
98✔
365
    STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
98✔
366
  }
367
  STREAM_CHECK_RET_GOTO(
59!
368
      buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, uid, req->skey, req->ekey, ver, 1));
369
  pBlock->info.rows++;
59✔
370

371
  stDebug("stream reader scan delete end data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
59✔
372
end:
48✔
373
  return code;
118✔
374
}
375

376
static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
118✔
377
                              int64_t ver) {
378
  int32_t    code = 0;
118✔
379
  int32_t    lino = 0;
118✔
380
  SDecoder   decoder = {0};
118✔
381
  SDeleteRes req = {0};
118✔
382
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
118✔
383
  tDecoderInit(&decoder, data, len);
118✔
384
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
118!
385
  
386
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
236✔
387
    uint64_t* uid = taosArrayGet(req.uidList, i);
118✔
388
    STREAM_CHECK_NULL_GOTO(uid, terrno);
118!
389
    STREAM_CHECK_RET_GOTO(buildDeleteData(pTableList, isVTable, pBlock, &req, *uid, ver));
118!
390
  }
391

392
end:
118✔
393
  taosArrayDestroy(req.uidList);
118✔
394
  tDecoderClear(&decoder);
118✔
395
  return code;
118✔
396
}
397

398
static int32_t scanDropTable(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
×
399
                             int64_t ver) {
400
  int32_t  code = 0;
×
401
  int32_t  lino = 0;
×
402
  SDecoder decoder = {0};
×
403

404
  SVDropTbBatchReq req = {0};
×
405
  tDecoderInit(&decoder, data, len);
×
406
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
×
407
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
408
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
409
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
×
410
    uint64_t gid = 0;
×
411
    if (isVTable) {
×
412
      if (taosHashGet(pTableList, &pDropTbReq->uid, sizeof(pDropTbReq->uid)) == NULL) {
×
413
        continue;
×
414
      }
415
    } else {
416
      gid = qStreamGetGroupId(pTableList, pDropTbReq->uid);
×
417
      if (gid == -1) continue;
×
418
    }
419

420
    STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_DELETE_TABLE, gid, isVTable, pDropTbReq->uid, 0, 0, ver, 1));
×
421
    pBlock->info.rows++;
×
422
    stDebug("stream reader scan drop :uid %" PRIu64 ", gid %" PRIu64, pDropTbReq->uid, gid);
×
423
  }
424

425
end:
×
426
  tDecoderClear(&decoder);
×
427
  return code;
×
428
}
429

430
static int32_t scanSubmitData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
40,561✔
431
                              int64_t ver) {
432
  int32_t  code = 0;
40,561✔
433
  int32_t  lino = 0;
40,561✔
434
  SDecoder decoder = {0};
40,561✔
435

436
  SSubmitReq2 submit = {0};
40,561✔
437
  tDecoderInit(&decoder, data, len);
40,561✔
438
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
40,518!
439

440
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
40,454✔
441

442
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfBlocks));
40,446!
443

444
  int32_t nextBlk = -1;
40,440✔
445
  while (++nextBlk < numOfBlocks) {
81,029✔
446
    stDebug("stream reader scan submit, next data block %d/%d", nextBlk, numOfBlocks);
40,433✔
447
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
40,433✔
448
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
40,400!
449
    STREAM_CHECK_RET_GOTO(retrieveWalMetaData(pSubmitTbData, pTableList, isVTable, pBlock, ver));
40,400!
450
  }
451
end:
40,596✔
452
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
40,596✔
453
  tDecoderClear(&decoder);
40,561✔
454
  return code;
40,625✔
455
}
456

457
static int32_t scanWal(SVnode* pVnode, void* pTableList, bool isVTable, SSDataBlock* pBlock, int64_t lastVer,
6,982✔
458
                       int8_t deleteData, int8_t deleteTb, int64_t ctime, int64_t* retVer) {
459
  int32_t code = 0;
6,982✔
460
  int32_t lino = 0;
6,982✔
461

462
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
6,982✔
463
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
6,977!
464
  *retVer = walGetAppliedVer(pWalReader->pWal);
6,977✔
465
  STREAM_CHECK_CONDITION_GOTO(walReaderSeekVer(pWalReader, lastVer + 1) != 0, TSDB_CODE_SUCCESS);
6,985✔
466

467
  while (1) {
40,868✔
468
    *retVer = walGetAppliedVer(pWalReader->pWal);
42,220✔
469
    STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader, true) < 0, TSDB_CODE_SUCCESS);
42,196✔
470

471
    SWalCont* wCont = &pWalReader->pHead->head;
40,860✔
472
    if (wCont->ingestTs / 1000 > ctime) break;
40,860!
473
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
40,860✔
474
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
40,860✔
475
    int64_t ver = wCont->version;
40,860✔
476

477
    stDebug("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
40,860✔
478
      TD_VID(pVnode), ver, wCont->msgType, deleteData, deleteTb);
479
    if (wCont->msgType == TDMT_VND_DELETE && deleteData != 0) {
40,861✔
480
      STREAM_CHECK_RET_GOTO(scanDeleteData(pTableList, isVTable, pBlock, data, len, ver));
118!
481
    } else if (wCont->msgType == TDMT_VND_DROP_TABLE && deleteTb != 0) {
40,743!
482
      STREAM_CHECK_RET_GOTO(scanDropTable(pTableList, isVTable, pBlock, data, len, ver));
×
483
    } else if (wCont->msgType == TDMT_VND_SUBMIT) {
40,743✔
484
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
40,592✔
485
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
40,592✔
486
      STREAM_CHECK_RET_GOTO(scanSubmitData(pTableList, isVTable, pBlock, data, len, ver));
40,592!
487
    }
488

489
    if (pBlock->info.rows >= STREAM_RETURN_ROWS_NUM) {
40,868!
490
      break;
×
491
    }
492
  }
493

494
end:
6,982✔
495
  walCloseReader(pWalReader);
6,982✔
496
  STREAM_PRINT_LOG_END(code, lino);
6,983!
497
  return code;
6,993✔
498
}
499

500
int32_t scanWalOneVer(SVnode* pVnode, SSDataBlock* pBlock, SSDataBlock* pBlockRet,
6,245✔
501
                      int64_t ver, int64_t uid, STimeWindow* window) {
502
  int32_t     code = 0;
6,245✔
503
  int32_t     lino = 0;
6,245✔
504
  SSubmitReq2 submit = {0};
6,245✔
505
  SDecoder    decoder = {0};
6,245✔
506

507
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
6,245✔
508
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
6,238!
509

510
  STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, ver));
6,238!
511
  STREAM_CHECK_CONDITION_GOTO(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT, TSDB_CODE_STREAM_WAL_VER_NOT_DATA);
6,234!
512
  STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
6,234!
513
  void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
6,241✔
514
  int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
6,241✔
515

516
  int32_t nextBlk = -1;
6,241✔
517
  tDecoderInit(&decoder, pBody, bodyLen);
6,241✔
518
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
6,239!
519

520
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
6,243✔
521
  while (++nextBlk < numOfBlocks) {
12,471✔
522
    stDebug("stream reader next data block %d/%d", nextBlk, numOfBlocks);
6,237✔
523
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
6,237✔
524
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
6,237!
525
    if (pSubmitTbData->uid != uid) {
6,237!
526
      stDebug("stream reader skip data block uid:%" PRId64, pSubmitTbData->uid);
×
527
      continue;
×
528
    }
529
    STREAM_CHECK_RET_GOTO(retrieveWalData(pVnode, pSubmitTbData, pBlock, window));
6,237!
530
    printDataBlock(pBlock, __func__, "");
6,242✔
531

532
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRet, pBlock));
6,241!
533
    blockDataCleanup(pBlock);
6,238✔
534
  }
535

536
end:
6,234✔
537
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
6,234✔
538
  walCloseReader(pWalReader);
6,244✔
539
  tDecoderClear(&decoder);
6,245✔
540
  STREAM_PRINT_LOG_END(code, lino);
6,246!
541
  return code;
6,246✔
542
}
543

544
static int32_t processTag(SVnode* pVnode, SExprInfo* pExpr, int32_t numOfExpr, SStorageAPI* api, SSDataBlock* pBlock) {
4,918✔
545
  int32_t     code = 0;
4,918✔
546
  int32_t     lino = 0;
4,918✔
547
  SMetaReader mr = {0};
4,918✔
548

549
  if (numOfExpr == 0) {
4,918!
550
    return TSDB_CODE_SUCCESS;
×
551
  }
552
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
4,918✔
553
  code = api->metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
4,924✔
554
  if (code != TSDB_CODE_SUCCESS) {
4,912!
555
    stError("failed to get table meta, uid:%" PRId64 ", code:%s", pBlock->info.id.uid, tstrerror(code));
×
556
  }
557
  api->metaReaderFn.readerReleaseLock(&mr);
4,912✔
558
  for (int32_t j = 0; j < numOfExpr; ++j) {
21,167✔
559
    const SExprInfo* pExpr1 = &pExpr[j];
16,253✔
560
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
16,253✔
561

562
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
16,253✔
563
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
16,249✔
564
    if (mr.me.name == NULL) {
16,245!
565
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
566
      continue;
×
567
    }
568
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
16,245✔
569

570
    int32_t functionId = pExpr1->pExpr->_function.functionId;
16,246✔
571

572
    // this is to handle the tbname
573
    if (fmIsScanPseudoColumnFunc(functionId)) {
16,246✔
574
      int32_t fType = pExpr1->pExpr->_function.functionType;
4,916✔
575
      if (fType == FUNCTION_TYPE_TBNAME) {
4,916!
576
        STREAM_CHECK_RET_GOTO(setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name));
4,917!
577
        pColInfoData->info.colId = -1;
4,910✔
578
      }
579
    } else {  // these are tags
580
      STagVal tagVal = {0};
11,336✔
581
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
11,336✔
582
      const char* p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
11,336✔
583

584
      char* data = NULL;
11,339✔
585
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
11,339!
586
        data = tTagValToData((const STagVal*)p, false);
11,335✔
587
      } else {
588
        data = (char*)p;
4✔
589
      }
590

591
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
11,337!
592
      if (isNullVal) {
11,337!
593
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
594
      } else {
595
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
11,337✔
596
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
11,334!
597
          taosMemoryFree(data);
7,016!
598
        }
599
        STREAM_CHECK_RET_GOTO(code);
11,339!
600
      }
601
    }
602
  }
603

604
end:
4,914✔
605
  api->metaReaderFn.clearReader(&mr);
4,914✔
606

607
  STREAM_PRINT_LOG_END(code, lino);
4,918!
608
  return code;
4,920✔
609
}
610

611
static int32_t processWalVerData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamInfo, int64_t ver, bool isTrigger,
4,711✔
612
                                 int64_t uid, STimeWindow* window, SSDataBlock* pSrcBlock, SSDataBlock** pBlock) {
613
  int32_t      code = 0;
4,711✔
614
  int32_t      lino = 0;
4,711✔
615
  SFilterInfo* pFilterInfo = NULL;
4,711✔
616
  SSDataBlock* pBlock1 = NULL;
4,711✔
617
  SSDataBlock* pBlock2 = NULL;
4,711✔
618

619
  SExprInfo*   pExpr = sStreamInfo->pExprInfo;
4,711✔
620
  int32_t      numOfExpr = sStreamInfo->numOfExpr;
4,711✔
621

622
  STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamInfo->pConditions, &pFilterInfo, 0, NULL));
4,711!
623

624
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock1));
4,712!
625
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock2));
4,710!
626
  if (!isTrigger) STREAM_CHECK_RET_GOTO(createOneDataBlock(pSrcBlock, false, pBlock));
4,712!
627

628
  pBlock2->info.id.uid = uid;
4,713✔
629
  pBlock1->info.id.uid = uid;
4,713✔
630

631
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
4,713!
632

633
  if (pBlock2->info.rows > 0) {
4,712!
634
    SStorageAPI  api = {0};
4,712✔
635
    initStorageAPI(&api);
4,712✔
636
    STREAM_CHECK_RET_GOTO(processTag(pVnode, pExpr, numOfExpr, &api, pBlock2));
4,707!
637
  }
638
  STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock2, pFilterInfo));
4,708!
639
  if (!isTrigger) {
4,712✔
640
    blockDataTransform(*pBlock, pBlock2);
2,769✔
641
  } else {
642
    *pBlock = pBlock2;
1,943✔
643
    pBlock2 = NULL;  
1,943✔
644
  }
645

646
  printDataBlock(*pBlock, __func__, "processWalVerData2");
4,712✔
647

648
end:
4,708✔
649
  STREAM_PRINT_LOG_END(code, lino);
4,708!
650
  filterFreeInfo(pFilterInfo);
4,708✔
651
  blockDataDestroy(pBlock1);
4,709✔
652
  blockDataDestroy(pBlock2);
4,709✔
653
  return code;
4,709✔
654
}
655

656
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
1,677✔
657
  int32_t code = 0;
1,677✔
658
  int32_t lino = 0;
1,677✔
659
  SMetaReader metaReader = {0};
1,677✔
660
  SStorageAPI api = {0};
1,677✔
661
  initStorageAPI(&api);
1,677✔
662
  *schemas = taosArrayInit(8, sizeof(SSchema));
1,677✔
663
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
1,677!
664
  
665
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
1,677✔
666
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
1,676!
667

668
  SSchemaWrapper* sSchemaWrapper = NULL;
1,672✔
669
  if (metaReader.me.type == TD_CHILD_TABLE) {
1,672!
670
    int64_t suid = metaReader.me.ctbEntry.suid;
1,672✔
671
    tDecoderClear(&metaReader.coder);
1,672✔
672
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
1,676!
673
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
1,675✔
674
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
×
675
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
×
676
  } else {
677
    qError("invalid table type:%d", metaReader.me.type);
×
678
  }
679

680
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
6,715✔
681
    SSchema* s = sSchemaWrapper->pSchema + j;
5,039✔
682
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
10,079!
683
  }
684

685
end:
1,676✔
686
  api.metaReaderFn.clearReader(&metaReader);
1,676✔
687
  STREAM_PRINT_LOG_END(code, lino);
1,677!
688
  if (code != 0)  taosArrayDestroy(*schemas);
1,677!
689
  return code;
1,677✔
690
}
691

692
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
1,677✔
693
  int32_t code = 0;
1,677✔
694
  int32_t lino = 0;
1,677✔
695
  for (size_t i = 0; i < taosArrayGetSize(schemas); i++) {
6,718✔
696
    SSchema* s = taosArrayGet(schemas, i);
5,041✔
697
    STREAM_CHECK_NULL_GOTO(s, terrno);
5,042!
698

699
    size_t j = 0;
5,042✔
700
    for (; j < taosArrayGetSize(cols); j++) {
11,523✔
701
      col_id_t* id = taosArrayGet(cols, j);
11,226✔
702
      STREAM_CHECK_NULL_GOTO(id, terrno);
11,226!
703
      if (*id == s->colId) {
11,226✔
704
        break;
4,745✔
705
      }
706
    }
707
    if (j == taosArrayGetSize(cols)) {
5,041✔
708
      // not found, remove it
709
      taosArrayRemove(schemas, i);
296✔
710
      i--;
296✔
711
    }
712
  }
713

714
end:
1,676✔
715
  return code;
1,676✔
716
}
717

718
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
1,533✔
719
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
720
  int32_t      code = 0;
1,533✔
721
  int32_t      lino = 0;
1,533✔
722
  SArray*      schemas = NULL;
1,533✔
723

724
  SSDataBlock* pBlock1 = NULL;
1,533✔
725
  SSDataBlock* pBlock2 = NULL;
1,533✔
726

727
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
1,533!
728
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
1,533!
729
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock1));
1,532!
730
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
1,533!
731

732
  pBlock2->info.id.uid = uid;
1,533✔
733
  pBlock1->info.id.uid = uid;
1,533✔
734

735
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
1,533!
736
  printDataBlock(pBlock2, __func__, "");
1,533✔
737

738
  *pBlock = pBlock2;
1,531✔
739
  pBlock2 = NULL;
1,531✔
740

741
end:
1,531✔
742
  STREAM_PRINT_LOG_END(code, lino);
1,531!
743
  blockDataDestroy(pBlock1);
1,531✔
744
  blockDataDestroy(pBlock2);
1,532✔
745
  taosArrayDestroy(schemas);
1,533✔
746
  return code;
1,531✔
747
}
748

749
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
263,150✔
750
                                    STargetNode* pTargetNodeTs) {
751
  int32_t code = 0;
263,150✔
752
  int32_t lino = 0;
263,150✔
753

754
  SColumnNode*         pCol = NULL;
263,150✔
755
  SColumnNode*         pCol1 = NULL;
263,150✔
756
  SValueNode*          pVal = NULL;
263,150✔
757
  SValueNode*          pVal1 = NULL;
263,150✔
758
  SOperatorNode*       op = NULL;
263,150✔
759
  SOperatorNode*       op1 = NULL;
263,150✔
760
  SLogicConditionNode* cond = NULL;
263,150✔
761

762
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
263,150!
763
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
263,150✔
764
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
263,150✔
765
  pCol->node.resType.bytes = LONG_BYTES;
263,150✔
766
  pCol->slotId = pTargetNodeTs->slotId;
263,150✔
767
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
263,150✔
768

769
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
263,150!
770

771
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
263,150!
772
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
263,150✔
773
  pVal->node.resType.bytes = LONG_BYTES;
263,150✔
774
  pVal->datum.i = start;
263,150✔
775
  pVal->typeData = start;
263,150✔
776

777
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
263,150!
778
  pVal1->datum.i = end;
263,150✔
779
  pVal1->typeData = end;
263,150✔
780

781
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
263,150!
782
  op->opType = OP_TYPE_GREATER_EQUAL;
263,150✔
783
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,150✔
784
  op->node.resType.bytes = CHAR_BYTES;
263,150✔
785
  op->pLeft = (SNode*)pCol;
263,150✔
786
  op->pRight = (SNode*)pVal;
263,150✔
787
  pCol = NULL;
263,150✔
788
  pVal = NULL;
263,150✔
789

790
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
263,150!
791
  op1->opType = OP_TYPE_LOWER_EQUAL;
263,150✔
792
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,150✔
793
  op1->node.resType.bytes = CHAR_BYTES;
263,150✔
794
  op1->pLeft = (SNode*)pCol1;
263,150✔
795
  op1->pRight = (SNode*)pVal1;
263,150✔
796
  pCol1 = NULL;
263,150✔
797
  pVal1 = NULL;
263,150✔
798

799
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
263,150!
800
  cond->condType = LOGIC_COND_TYPE_AND;
263,150✔
801
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,150✔
802
  cond->node.resType.bytes = CHAR_BYTES;
263,150✔
803
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
263,150!
804
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
263,150!
805
  op = NULL;
263,150✔
806
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
263,150!
807
  op1 = NULL;
263,150✔
808

809
  *pCond = cond;
263,150✔
810

811
end:
263,150✔
812
  if (code != 0) {
263,150!
813
    nodesDestroyNode((SNode*)pCol);
×
814
    nodesDestroyNode((SNode*)pCol1);
×
815
    nodesDestroyNode((SNode*)pVal);
×
816
    nodesDestroyNode((SNode*)pVal1);
×
817
    nodesDestroyNode((SNode*)op);
×
818
    nodesDestroyNode((SNode*)op1);
×
819
    nodesDestroyNode((SNode*)cond);
×
820
  }
821
  STREAM_PRINT_LOG_END(code, lino);
263,150!
822

823
  return code;
263,150✔
824
}
825

826
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
184✔
827
  int32_t              code = 0;
184✔
828
  int32_t              lino = 0;
184✔
829
  SLogicConditionNode* pAndCondition = NULL;
184✔
830
  SLogicConditionNode* cond = NULL;
184✔
831

832
  if (pTargetNodeTs == NULL) {
184!
833
    vError("stream reader %s no ts column", __func__);
×
834
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
×
835
  }
836
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
184!
837
  cond->condType = LOGIC_COND_TYPE_OR;
184✔
838
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
184✔
839
  cond->node.resType.bytes = CHAR_BYTES;
184✔
840
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
184!
841

842
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
263,334✔
843
    data->curIdx = i;
263,150✔
844

845
    SReadHandle handle = {0};
263,150✔
846
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
263,150✔
847
    if (!handle.winRangeValid) {
263,150!
848
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
×
849
              handle.winRange.ekey);
850
      continue;
×
851
    }
852
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
263,150!
853
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
263,150✔
854
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
263,150!
855
    pAndCondition = NULL;
263,150✔
856
  }
857

858
  *pCond = cond;
184✔
859

860
end:
184✔
861
  if (code != 0) {
184!
862
    nodesDestroyNode((SNode*)pAndCondition);
×
863
    nodesDestroyNode((SNode*)cond);
×
864
  }
865
  STREAM_PRINT_LOG_END(code, lino);
184!
866

867
  return code;
184✔
868
}
869

870
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
15,386✔
871
                                    STimeRangeNode* node, SReadHandle* handle) {
872
  int32_t code = 0;
15,386✔
873
  int32_t lino = 0;
15,386✔
874
  SArray* funcVals = NULL;
15,386✔
875
  if (req->pStRtFuncInfo->withExternalWindow) {
15,386✔
876
    nodesDestroyNode(sStreamReaderCalcInfo->tsConditions);
184✔
877
    filterFreeInfo(sStreamReaderCalcInfo->pFilterInfo);
184✔
878
    sStreamReaderCalcInfo->pFilterInfo = NULL;
184✔
879

880
    STREAM_CHECK_RET_GOTO(createExternalConditions(req->pStRtFuncInfo,
184!
881
                                                   (SLogicConditionNode**)&sStreamReaderCalcInfo->tsConditions,
882
                                                   sStreamReaderCalcInfo->pTargetNodeTs, node));
883

884
    STREAM_CHECK_RET_GOTO(filterInitFromNode((SNode*)sStreamReaderCalcInfo->tsConditions,
184!
885
                                             (SFilterInfo**)&sStreamReaderCalcInfo->pFilterInfo,
886
                                             FLT_OPTION_NO_REWRITE | FLT_OPTION_SCALAR_MODE, NULL));
887
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
184✔
888
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
184✔
889
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
184!
890
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
184!
891

892
    handle->winRange.skey = pFirst->wstart;
184✔
893
    handle->winRange.ekey = pLast->wend;
184✔
894
    handle->winRangeValid = true;
184✔
895
    stDebug("%s withExternalWindow is true, skey:%" PRId64 ", ekey:%" PRId64, __func__, pFirst->wstart, pLast->wend);
184✔
896
  } else {
897
    calcTimeRange(node, req->pStRtFuncInfo, &handle->winRange, &handle->winRangeValid);
15,202✔
898
  }
899

900
end:
15,386✔
901
  taosArrayDestroy(funcVals);
15,386✔
902
  return code;
15,386✔
903
}
904

905
static int32_t createBlockForWalMeta(SSDataBlock** pBlock, bool isVTable) {
6,956✔
906
  int32_t code = 0;
6,956✔
907
  int32_t lino = 0;
6,956✔
908
  SArray* schemas = NULL;
6,956✔
909

910
  schemas = taosArrayInit(8, sizeof(SSchema));
6,956✔
911
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
6,979!
912

913
  int32_t index = 0;
6,979✔
914
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
6,979!
915
  if (!isVTable) {
6,978✔
916
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid
6,353!
917
  }
918
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
6,987!
919
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
6,984!
920
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
6,982!
921
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
6,977!
922
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // nrows
6,984!
923

924
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
6,984!
925

926
end:
6,990✔
927
  taosArrayDestroy(schemas);
6,990✔
928
  return code;
6,986✔
929
}
930

931
static int32_t createOptionsForLastTs(SStreamTriggerReaderTaskInnerOptions* options,
316✔
932
                                      SStreamTriggerReaderInfo*             sStreamReaderInfo) {
933
  int32_t code = 0;
316✔
934
  int32_t lino = 0;
316✔
935
  SArray* schemas = NULL;
316✔
936

937
  schemas = taosArrayInit(4, sizeof(SSchema));
316✔
938
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
316!
939
  STREAM_CHECK_RET_GOTO(
316!
940
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // last ts
941

942
  BUILD_OPTION(op, sStreamReaderInfo, -1, true, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, schemas, true,
316✔
943
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
944
  schemas = NULL;
316✔
945
  *options = op;
316✔
946

947
end:
316✔
948
  taosArrayDestroy(schemas);
316✔
949
  return code;
316✔
950
}
951

952
static int32_t createOptionsForFirstTs(SStreamTriggerReaderTaskInnerOptions* options,
319✔
953
                                       SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t ver) {
954
  int32_t code = 0;
319✔
955
  int32_t lino = 0;
319✔
956
  SArray* schemas = NULL;
319✔
957

958
  schemas = taosArrayInit(4, sizeof(SSchema));
319✔
959
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
319!
960
  STREAM_CHECK_RET_GOTO(
319!
961
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // first ts
962

963
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, TSDB_ORDER_ASC, start, INT64_MAX, schemas, true,
319✔
964
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
965
  schemas = NULL;
319✔
966

967
  *options = op;
319✔
968
end:
319✔
969
  taosArrayDestroy(schemas);
319✔
970
  return code;
319✔
971
}
972

973
static int32_t createOptionsForTsdbMeta(SStreamTriggerReaderTaskInnerOptions* options,
672✔
974
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t end,
975
                                        int64_t gid, int8_t order, int64_t ver, bool onlyTs) {
976
  int32_t code = 0;
672✔
977
  int32_t lino = 0;
672✔
978
  SArray* schemas = NULL;
672✔
979

980
  int32_t index = 1;
672✔
981
  schemas = taosArrayInit(8, sizeof(SSchema));
672✔
982
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
672!
983
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
672!
984
  if (!onlyTs){
672✔
985
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
336!
986
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
336!
987
    if (sStreamReaderInfo->uidList == NULL) {
336✔
988
      STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
131!
989
    }
990
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
336!
991
  }
992
  
993
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, start, end, schemas, true, (gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), gid,
672✔
994
               true, sStreamReaderInfo->uidList);
995
  schemas = NULL;
672✔
996
  *options = op;
672✔
997

998
end:
672✔
999
  taosArrayDestroy(schemas);
672✔
1000
  return code;
672✔
1001
}
1002

1003
static int taosCompareInt64Asc(const void* elem1, const void* elem2) {
37✔
1004
  int64_t* node1 = (int64_t*)elem1;
37✔
1005
  int64_t* node2 = (int64_t*)elem2;
37✔
1006

1007
  if (*node1 < *node2) {
37!
1008
    return -1;
×
1009
  }
1010

1011
  return *node1 > *node2;
37✔
1012
}
1013

1014
static int32_t getTableList(SArray** pList, int32_t* pNum, int64_t* suid, int32_t index,
63✔
1015
                            SStreamTriggerReaderInfo* sStreamReaderInfo) {
1016
  int32_t code = 0;
63✔
1017
  int32_t lino = 0;
63✔
1018

1019
  int32_t* start = taosArrayGet(sStreamReaderInfo->uidListIndex, index);
63✔
1020
  STREAM_CHECK_NULL_GOTO(start, terrno);
63!
1021
  int32_t  iStart = *start;
63✔
1022
  int32_t* end = taosArrayGet(sStreamReaderInfo->uidListIndex, index + 1);
63✔
1023
  STREAM_CHECK_NULL_GOTO(end, terrno);
63!
1024
  int32_t iEnd = *end;
63✔
1025
  *pList = taosArrayInit(iEnd - iStart, sizeof(STableKeyInfo));
63✔
1026
  STREAM_CHECK_NULL_GOTO(*pList, terrno);
63!
1027
  for (int32_t i = iStart; i < iEnd; ++i) {
177✔
1028
    int64_t*       uid = taosArrayGet(sStreamReaderInfo->uidList, i);
114✔
1029
    STableKeyInfo* info = taosArrayReserve(*pList, 1);
114✔
1030
    STREAM_CHECK_NULL_GOTO(info, terrno);
114!
1031
    *suid = uid[0];
114✔
1032
    info->uid = uid[1];
114✔
1033
  }
1034
  *pNum = iEnd - iStart;
63✔
1035
end:
63✔
1036
  STREAM_PRINT_LOG_END(code, lino);
63!
1037
  return code;
63✔
1038
}
1039

1040
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
571✔
1041
                                  SStreamReaderTaskInner* pTask) {
1042
  int32_t code = 0;
571✔
1043
  int32_t lino = 0;
571✔
1044

1045
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTask->pTableList), sizeof(STsInfo));
571✔
1046
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
572!
1047
  while (true) {
575✔
1048
    bool hasNext = false;
1,147✔
1049
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
1,147!
1050
    if (hasNext) {
1,147✔
1051
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
672✔
1052
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
672✔
1053
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
672!
1054
      if (pTask->options.order == TSDB_ORDER_ASC) {
672✔
1055
        tsInfo->ts = pTask->pResBlock->info.window.skey;
548✔
1056
      } else {
1057
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
124✔
1058
      }
1059
      tsInfo->gId = qStreamGetGroupId(pTask->pTableList, pTask->pResBlock->info.id.uid);
672✔
1060
      stDebug("vgId:%d %s get last ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
672✔
1061
              tsInfo->gId, tsRsp->ver);
1062
    }
1063

1064
    pTask->currentGroupIndex++;
1,147✔
1065
    if (pTask->currentGroupIndex >= qStreamGetTableListGroupNum(pTask->pTableList)) {
1,147✔
1066
      break;
571✔
1067
    }
1068
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTask));
575!
1069
  }
1070

1071
end:
571✔
1072
  return code;
571✔
1073
}
1074

1075
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
63✔
1076
                               SStreamReaderTaskInner* pTask, int64_t ver) {
1077
  int32_t code = 0;
63✔
1078
  int32_t lino = 0;
63✔
1079
  int64_t suid = 0;
63✔
1080
  SArray* pList = NULL;
63✔
1081
  int32_t pNum = 0;
63✔
1082

1083
  tsRsp->tsInfo = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(STsInfo));
63✔
1084
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
63!
1085
  for (int32_t i = 0; i < taosArrayGetSize(sStreamReaderInfo->uidListIndex) - 1; ++i) {
126✔
1086
    STREAM_CHECK_RET_GOTO(getTableList(&pList, &pNum, &suid, i, sStreamReaderInfo));
63!
1087

1088
    cleanupQueryTableDataCond(&pTask->cond);
63✔
1089
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas,
63!
1090
                                                        pTask->options.isSchema, pTask->options.twindows, suid, ver));
1091
    STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderOpen(
63!
1092
        pVnode, &pTask->cond, taosArrayGet(pList, 0), pNum, pTask->pResBlock, (void**)&pTask->pReader, pTask->idStr, NULL));
1093
    taosArrayDestroy(pList);
63✔
1094
    pList = NULL;
63✔
1095
    while (true) {
72✔
1096
      bool hasNext = false;
135✔
1097
      STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
135!
1098
      if (!hasNext) {
135✔
1099
        break;
63✔
1100
      }
1101
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
72✔
1102
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
72✔
1103
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
72!
1104
      if (pTask->options.order == TSDB_ORDER_ASC) {
72✔
1105
        tsInfo->ts = pTask->pResBlock->info.window.skey;
38✔
1106
      } else {
1107
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
34✔
1108
      }
1109
      tsInfo->gId = pTask->pResBlock->info.id.uid;
72✔
1110
      stDebug("vgId:%d %s get vtable last ts:%" PRId64 ", uid:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__,
72✔
1111
              tsInfo->ts, tsInfo->gId, tsRsp->ver);
1112
    }
1113
    pTask->api.tsdReader.tsdReaderClose(pTask->pReader);
63✔
1114
    pTask->pReader = NULL;
63✔
1115
  }
1116

1117
end:
63✔
1118
  taosArrayDestroy(pList);
63✔
1119
  return code;
63✔
1120
}
1121

1122
static void reSetUid(SStreamTriggerReaderTaskInnerOptions* options, int64_t suid, int64_t uid) {
181✔
1123
  if (suid != 0) options->suid = suid;
181✔
1124
  options->uid = uid;
181✔
1125
  if (options->suid != 0) {
181✔
1126
    options->tableType = TD_CHILD_TABLE;
178✔
1127
  } else {
1128
    options->tableType = TD_NORMAL_TABLE;
3✔
1129
  }
1130
}
181✔
1131

1132
static int32_t createOptionsForTsdbData(SVnode* pVnode, SStreamTriggerReaderTaskInnerOptions* options,
144✔
1133
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid, SArray* cols,
1134
                                        int8_t order,int64_t skey, int64_t ekey, int64_t ver) {
1135
  int32_t code = 0;
144✔
1136
  int32_t lino = 0;
144✔
1137
  SArray* schemas = NULL;
144✔
1138

1139
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
144!
1140
  STREAM_CHECK_RET_GOTO(shrinkScheams(cols, schemas));
144!
1141
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, skey, ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
144✔
1142
  *options = op;
144✔
1143

1144
end:
144✔
1145
  return code;
144✔
1146
}
1147

1148
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
47✔
1149
  int32_t code = 0;
47✔
1150
  int32_t lino = 0;
47✔
1151
  void*   buf = NULL;
47✔
1152
  size_t  size = 0;
47✔
1153
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
47!
1154
  void* pTask = sStreamReaderInfo->pTask;
47✔
1155

1156
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
47✔
1157

1158
  TSWAP(sStreamReaderInfo->uidList, req->setTableReq.uids);
47✔
1159
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidList, TSDB_CODE_INVALID_PARA);
47!
1160

1161
  taosArraySort(sStreamReaderInfo->uidList, taosCompareInt64Asc);
47✔
1162
  if (sStreamReaderInfo->uidListIndex != NULL) {
47!
1163
    taosArrayClear(sStreamReaderInfo->uidListIndex);
×
1164
  } else {
1165
    sStreamReaderInfo->uidListIndex = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(int32_t));
47✔
1166
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidListIndex, TSDB_CODE_INVALID_PARA);
47!
1167
  }
1168

1169
  if (sStreamReaderInfo->uidHash != NULL) {
47!
1170
    taosHashClear(sStreamReaderInfo->uidHash);
×
1171
  } else {
1172
    sStreamReaderInfo->uidHash = taosHashInit(taosArrayGetSize(sStreamReaderInfo->uidList),
47✔
1173
                                              taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1174
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHash, TSDB_CODE_INVALID_PARA);
47!
1175
  }
1176

1177
  int64_t suid = 0;
47✔
1178
  int32_t cnt = taosArrayGetSize(sStreamReaderInfo->uidList);
47✔
1179
  for (int32_t i = 0; i < cnt; ++i) {
123✔
1180
    int64_t* data = taosArrayGet(sStreamReaderInfo->uidList, i);
76✔
1181
    STREAM_CHECK_NULL_GOTO(data, terrno);
76!
1182
    if (*data == 0 || *data != suid) {
76✔
1183
      STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &i), terrno);
94!
1184
    }
1185
    suid = *data;
76✔
1186
    stDebug("vgId:%d %s suid:%" PRId64 ",uid:%" PRId64 ", index:%d", TD_VID(pVnode), __func__, suid, data[1], i);
76✔
1187
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->uidHash, data + 1, LONG_BYTES, &i, sizeof(int32_t)));
76!
1188
  }
1189
  STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &cnt), terrno);
94!
1190

1191
end:
47✔
1192
  STREAM_PRINT_LOG_END_WITHID(code, lino);
47!
1193
  SRpcMsg rsp = {
47✔
1194
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1195
  tmsgSendRsp(&rsp);
47✔
1196
  return code;
47✔
1197
}
1198

1199
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
314✔
1200
  int32_t                 code = 0;
314✔
1201
  int32_t                 lino = 0;
314✔
1202
  SArray*                 schemas = NULL;
314✔
1203
  SStreamReaderTaskInner* pTaskInner = NULL;
314✔
1204
  SStreamTsResponse       lastTsRsp = {0};
314✔
1205
  void*                   buf = NULL;
314✔
1206
  size_t                  size = 0;
314✔
1207

1208
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
314!
1209
  void* pTask = sStreamReaderInfo->pTask;
314✔
1210

1211
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
314✔
1212

1213
  SStreamTriggerReaderTaskInnerOptions options = {0};
314✔
1214
  STREAM_CHECK_RET_GOTO(createOptionsForLastTs(&options, sStreamReaderInfo));
314!
1215
  SStorageAPI api = {0};
316✔
1216
  initStorageAPI(&api);
316✔
1217
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
316!
1218

1219
  lastTsRsp.ver = pVnode->state.applied;
316✔
1220
  if (sStreamReaderInfo->uidList != NULL) {
316✔
1221
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner, -1));
47!
1222
  } else {
1223
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
269!
1224
  }
1225
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
315✔
1226
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
316!
1227

1228
end:
316✔
1229
  STREAM_PRINT_LOG_END_WITHID(code, lino);
316!
1230
  SRpcMsg rsp = {
316✔
1231
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1232
  tmsgSendRsp(&rsp);
316✔
1233
  taosArrayDestroy(lastTsRsp.tsInfo);
316✔
1234
  releaseStreamTask(&pTaskInner);
316✔
1235
  return code;
315✔
1236
}
1237

1238
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
319✔
1239
  int32_t                 code = 0;
319✔
1240
  int32_t                 lino = 0;
319✔
1241
  SStreamReaderTaskInner* pTaskInner = NULL;
319✔
1242
  SStreamTsResponse       firstTsRsp = {0};
319✔
1243
  void*                   buf = NULL;
319✔
1244
  size_t                  size = 0;
319✔
1245

1246
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
319!
1247
  void* pTask = sStreamReaderInfo->pTask;
319✔
1248
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
319✔
1249
  SStreamTriggerReaderTaskInnerOptions options = {0};
319✔
1250
  STREAM_CHECK_RET_GOTO(createOptionsForFirstTs(&options, sStreamReaderInfo, req->firstTsReq.startTime, req->firstTsReq.ver));
319!
1251
  SStorageAPI api = {0};
319✔
1252
  initStorageAPI(&api);
319✔
1253
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
319!
1254
  
1255
  firstTsRsp.ver = pVnode->state.applied;
319✔
1256
  if (sStreamReaderInfo->uidList != NULL) {
319✔
1257
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.ver));
16!
1258
  } else {
1259
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
303!
1260
  }
1261

1262
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
319✔
1263
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
319!
1264

1265
end:
319✔
1266
  STREAM_PRINT_LOG_END_WITHID(code, lino);
319!
1267
  SRpcMsg rsp = {
319✔
1268
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1269
  tmsgSendRsp(&rsp);
319✔
1270
  taosArrayDestroy(firstTsRsp.tsInfo);
319✔
1271
  releaseStreamTask(&pTaskInner);
319✔
1272
  return code;
319✔
1273
}
1274

1275
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
336✔
1276
  int32_t code = 0;
336✔
1277
  int32_t lino = 0;
336✔
1278
  void*   buf = NULL;
336✔
1279
  size_t  size = 0;
336✔
1280
  SStreamTriggerReaderTaskInnerOptions options = {0};
336✔
1281

1282
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
336!
1283
  void* pTask = sStreamReaderInfo->pTask;
336✔
1284
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
336✔
1285

1286
  SStreamReaderTaskInner* pTaskInner = NULL;
336✔
1287
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
336✔
1288

1289
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
336!
1290
    SStreamTriggerReaderTaskInnerOptions optionsTs = {0};
336✔
1291

1292
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&optionsTs, sStreamReaderInfo, req->tsdbMetaReq.startTime,
336!
1293
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, true));
1294
    SStorageAPI api = {0};
336✔
1295
    initStorageAPI(&api);
336✔
1296
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &optionsTs, &pTaskInner, NULL, sStreamReaderInfo->groupIdMap, &api));
336!
1297
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
336!
1298
    
1299
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&options, sStreamReaderInfo, req->tsdbMetaReq.startTime,
336!
1300
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, false));
1301
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(options.schemas, &pTaskInner->pResBlockDst));
336!
1302
  } else {
1303
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1304
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1305
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1306
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1307
  }
1308

1309
  blockDataCleanup(pTaskInner->pResBlockDst);
336✔
1310
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
336!
1311
  bool hasNext = true;
336✔
1312
  while (true) {
96✔
1313
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
432!
1314
    if (!hasNext) {
432✔
1315
      break;
336✔
1316
    }
1317
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
96✔
1318
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
96✔
1319

1320
    int32_t index = 0;
96✔
1321
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
96!
1322
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
96!
1323
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
96!
1324
    if (sStreamReaderInfo->uidList == NULL) {
96✔
1325
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
36!
1326
    }
1327
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
96!
1328

1329
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
96✔
1330
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1331
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1332
            pTaskInner->pResBlockDst->info.rows++;
96✔
1333
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
96!
1334
      break;
×
1335
    }
1336
  }
1337

1338
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
336✔
1339
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
336!
1340
  if (!hasNext) {
336!
1341
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
336!
1342
  }
1343

1344
end:
336✔
1345
  STREAM_PRINT_LOG_END_WITHID(code, lino);
336!
1346
  SRpcMsg rsp = {
336✔
1347
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1348
  tmsgSendRsp(&rsp);
336✔
1349
  taosArrayDestroy(options.schemas);
336✔
1350
  return code;
336✔
1351
}
1352

1353
static int32_t vnodeProcessStreamTsdbTsDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
37✔
1354
  int32_t                 code = 0;
37✔
1355
  int32_t                 lino = 0;
37✔
1356
  SStreamReaderTaskInner* pTaskInner = NULL;
37✔
1357
  void*                   buf = NULL;
37✔
1358
  size_t                  size = 0;
37✔
1359
  SSDataBlock*            pBlockRes = NULL;
37✔
1360

1361
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
37!
1362
  void* pTask = sStreamReaderInfo->pTask;
37✔
1363
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
37!
1364

1365
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
37✔
1366
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
1367
  reSetUid(&options, req->tsdbTsDataReq.suid, req->tsdbTsDataReq.uid);
37✔
1368
  SStorageAPI api = {0};
37✔
1369
  initStorageAPI(&api);
37✔
1370
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
37!
1371
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
37!
1372
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
37!
1373

1374
  while (1) {
37✔
1375
    bool hasNext = false;
74✔
1376
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
74!
1377
    if (!hasNext) {
74✔
1378
      break;
37✔
1379
    }
1380
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
37✔
1381

1382
    SSDataBlock* pBlock = NULL;
37✔
1383
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
37!
1384
    if (pBlock != NULL && pBlock->info.rows > 0) {
37!
1385
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &api, pBlock));
37!
1386
    }
1387
    
1388
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
37!
1389
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
37!
1390
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
37!
1391
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1392
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1393
  }
1394

1395
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
37✔
1396

1397
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
37!
1398
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
37!
1399

1400
end:
37✔
1401
  STREAM_PRINT_LOG_END_WITHID(code, lino);
37!
1402
  SRpcMsg rsp = {
37✔
1403
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1404
  tmsgSendRsp(&rsp);
37✔
1405
  blockDataDestroy(pBlockRes);
37✔
1406

1407
  releaseStreamTask(&pTaskInner);
37✔
1408
  return code;
37✔
1409
}
1410

1411
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
336✔
1412
  int32_t code = 0;
336✔
1413
  int32_t lino = 0;
336✔
1414
  void*   buf = NULL;
336✔
1415
  size_t  size = 0;
336✔
1416

1417
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
336!
1418
  SStreamReaderTaskInner* pTaskInner = NULL;
336✔
1419
  void* pTask = sStreamReaderInfo->pTask;
336✔
1420
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
336✔
1421
  
1422
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
336✔
1423

1424
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
336✔
1425
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, true, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
162✔
1426
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
1427
                 req->tsdbTriggerDataReq.gid, true, NULL);
1428
    SStorageAPI api = {0};
162✔
1429
    initStorageAPI(&api);
162✔
1430
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock,
162!
1431
                                           sStreamReaderInfo->groupIdMap, &api));
1432
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
162!
1433
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
162!
1434
  } else {
1435
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
174✔
1436
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
174!
1437
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
174✔
1438
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
174!
1439
  }
1440

1441
  blockDataCleanup(pTaskInner->pResBlockDst);
336✔
1442
  bool hasNext = true;
336✔
1443
  while (1) {
×
1444
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
336!
1445
    if (!hasNext) {
336✔
1446
      break;
162✔
1447
    }
1448
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
174✔
1449
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
174✔
1450

1451
    SSDataBlock* pBlock = NULL;
174✔
1452
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
174!
1453
    if (pBlock != NULL && pBlock->info.rows > 0) {
174!
1454
      STREAM_CHECK_RET_GOTO(
174!
1455
        processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &pTaskInner->api, pBlock));
1456
    }
1457
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
174!
1458
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
174!
1459
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
174✔
1460
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1461
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1462
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
174!
1463
      break;
174✔
1464
    }
1465
  }
1466

1467
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
336!
1468
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
336✔
1469
  if (!hasNext) {
336✔
1470
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
162!
1471
  }
1472

1473
end:
336✔
1474
  STREAM_PRINT_LOG_END_WITHID(code, lino);
336!
1475
  SRpcMsg rsp = {
336✔
1476
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1477
  tmsgSendRsp(&rsp);
336✔
1478

1479
  return code;
336✔
1480
}
1481

1482
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
28,393✔
1483
  int32_t code = 0;
28,393✔
1484
  int32_t lino = 0;
28,393✔
1485
  void*   buf = NULL;
28,393✔
1486
  size_t  size = 0;
28,393✔
1487
  SSDataBlock*            pBlockRes = NULL;
28,393✔
1488

1489
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
28,393!
1490
  void* pTask = sStreamReaderInfo->pTask;
28,393✔
1491
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, 
28,393✔
1492
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid);
1493

1494
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
28,392!
1495

1496
  SStreamReaderTaskInner* pTaskInner = NULL;
28,392✔
1497
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
28,392✔
1498

1499
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
28,393!
1500
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
28,393✔
1501
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
1502
    SStorageAPI api = {0};
28,393✔
1503
    initStorageAPI(&api);
28,393✔
1504
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
28,393✔
1505

1506
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
28,293!
1507
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
28,294!
1508
  } else {
1509
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1510
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1511
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1512
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1513
  }
1514

1515
  blockDataCleanup(pTaskInner->pResBlockDst);
28,294✔
1516
  bool hasNext = true;
28,294✔
1517
  while (1) {
2,566✔
1518
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
30,860!
1519
    if (!hasNext) {
30,855✔
1520
      break;
28,291✔
1521
    }
1522
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
2,564✔
1523

1524
    SSDataBlock* pBlock = NULL;
2,563✔
1525
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
2,563!
1526
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
2,561!
1527
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
2,561!
1528
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
2,566!
1529
      break;
×
1530
    }
1531
  }
1532

1533
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
28,291!
1534
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
28,290!
1535
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
28,290✔
1536
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
28,294!
1537
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
28,293✔
1538
  if (!hasNext) {
28,294!
1539
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
28,294!
1540
  }
1541

1542
end:
28,293✔
1543
  STREAM_PRINT_LOG_END_WITHID(code, lino);
28,393!
1544
  SRpcMsg rsp = {
28,394✔
1545
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1546
  tmsgSendRsp(&rsp);
28,394✔
1547
  blockDataDestroy(pBlockRes);
28,393✔
1548
  return code;
28,393✔
1549
}
1550

1551
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
144✔
1552
  int32_t code = 0;
144✔
1553
  int32_t lino = 0;
144✔
1554
  void*   buf = NULL;
144✔
1555
  size_t  size = 0;
144✔
1556

1557
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
144!
1558
  void* pTask = sStreamReaderInfo->pTask;
144✔
1559
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
144✔
1560

1561
  SStreamReaderTaskInner* pTaskInner = NULL;
144✔
1562
  int64_t key = req->tsdbDataReq.uid;
144✔
1563

1564
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
144!
1565
    SStreamTriggerReaderTaskInnerOptions options = {0};
144✔
1566

1567
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbData(pVnode, &options, sStreamReaderInfo, req->tsdbDataReq.uid,
144!
1568
                                                   req->tsdbDataReq.cids, req->tsdbDataReq.order, req->tsdbDataReq.skey,
1569
                                                   req->tsdbDataReq.ekey, req->tsdbDataReq.ver));
1570
    reSetUid(&options, req->tsdbDataReq.suid, req->tsdbDataReq.uid);
144✔
1571

1572
    SStorageAPI api = {0};
144✔
1573
    initStorageAPI(&api);
144✔
1574
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
144!
1575

1576
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
144✔
1577
    cleanupQueryTableDataCond(&pTaskInner->cond);
144✔
1578
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
144!
1579
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
1580
                                                        pTaskInner->options.suid, pTaskInner->options.ver));
1581
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
144!
1582
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
1583

1584
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
144!
1585
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
144!
1586
  } else {
1587
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1588
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1589
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1590
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1591
  }
1592

1593
  blockDataCleanup(pTaskInner->pResBlockDst);
144✔
1594
  bool hasNext = true;
144✔
1595
  while (1) {
144✔
1596
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
288!
1597
    if (!hasNext) {
288✔
1598
      break;
144✔
1599
    }
1600

1601
    SSDataBlock* pBlock = NULL;
144✔
1602
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
144!
1603
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
144!
1604
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
144!
1605
      break;
×
1606
    }
1607
  }
1608
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
144!
1609
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
144✔
1610
  if (!hasNext) {
144!
1611
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
144!
1612
  }
1613

1614
end:
144✔
1615
  STREAM_PRINT_LOG_END_WITHID(code, lino);
144!
1616
  SRpcMsg rsp = {
144✔
1617
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1618
  tmsgSendRsp(&rsp);
144✔
1619

1620
  return code;
144✔
1621
}
1622

1623
static int32_t vnodeProcessStreamWalMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
6,978✔
1624
  int32_t      code = 0;
6,978✔
1625
  int32_t      lino = 0;
6,978✔
1626
  void*        buf = NULL;
6,978✔
1627
  size_t       size = 0;
6,978✔
1628
  SSDataBlock* pBlock = NULL;
6,978✔
1629
  void*        pTableList = NULL;
6,978✔
1630
  SNodeList*   groupNew = NULL;
6,978✔
1631
  int64_t      lastVer = 0;
6,978✔
1632

1633
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
6,978✔
1634
  void* pTask = sStreamReaderInfo->pTask;
6,952✔
1635
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64 ",ctime:%" PRId64, TD_VID(pVnode), __func__,
6,952✔
1636
  req->walMetaReq.lastVer, req->walMetaReq.ctime);
1637

1638
  bool isVTable = sStreamReaderInfo->uidList != NULL;
6,952✔
1639
  if (sStreamReaderInfo->uidList == NULL) {
6,952✔
1640
    SStorageAPI api = {0};
6,346✔
1641
    initStorageAPI(&api);
6,346✔
1642
    STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
6,353!
1643
    STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
6,350!
1644
        pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1645
        groupNew, false, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1646
        &pTableList, sStreamReaderInfo->groupIdMap));
1647
  }
1648

1649
  STREAM_CHECK_RET_GOTO(createBlockForWalMeta(&pBlock, isVTable));
6,939!
1650
  STREAM_CHECK_RET_GOTO(scanWal(pVnode, isVTable ? sStreamReaderInfo->uidHash : pTableList, isVTable, pBlock,
6,985!
1651
                                req->walMetaReq.lastVer, sStreamReaderInfo->deleteReCalc,
1652
                                sStreamReaderInfo->deleteOutTbl, req->walMetaReq.ctime, &lastVer));
1653

1654
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
6,988✔
1655
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
6,988!
1656
  printDataBlock(pBlock, __func__, "");
6,977✔
1657

1658
end:
6,999✔
1659
  if (pBlock != NULL && pBlock->info.rows == 0) {
6,999✔
1660
    code = TSDB_CODE_STREAM_NO_DATA;
6,626✔
1661
    buf = rpcMallocCont(sizeof(int64_t));
6,626✔
1662
    *(int64_t *)buf = lastVer;
6,627✔
1663
    size = sizeof(int64_t);
6,627✔
1664
  }
1665
  SRpcMsg rsp = {
7,000✔
1666
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1667
  tmsgSendRsp(&rsp);
7,000✔
1668
  if (code == TSDB_CODE_STREAM_NO_DATA){
7,018✔
1669
    code = 0;
6,634✔
1670
  }
1671
  STREAM_PRINT_LOG_END_WITHID(code, lino);
7,018!
1672
  nodesDestroyList(groupNew);
7,018✔
1673
  blockDataDestroy(pBlock);
7,008✔
1674
  qStreamDestroyTableList(pTableList);
7,017✔
1675

1676
  return code;
7,014✔
1677
}
1678

1679
static int32_t vnodeProcessStreamWalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
6,244✔
1680
  int32_t      code = 0;
6,244✔
1681
  int32_t      lino = 0;
6,244✔
1682
  void*        buf = NULL;
6,244✔
1683
  size_t       size = 0;
6,244✔
1684
  SSDataBlock* pBlock = NULL;
6,244✔
1685

1686
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
6,244!
1687
  void* pTask = sStreamReaderInfo->pTask;
6,244✔
1688
  ST_TASK_DLOG("vgId:%d %s start, request type:%d skey:%" PRId64 ",ekey:%" PRId64 ",uid:%" PRId64 ",ver:%" PRId64, TD_VID(pVnode),
6,244✔
1689
  __func__, req->walDataReq.base.type, req->walDataReq.skey, req->walDataReq.ekey, req->walDataReq.uid, req->walDataReq.ver);
1690

1691
  STimeWindow window = {.skey = req->walDataReq.skey, .ekey = req->walDataReq.ekey};
6,243✔
1692
  if (req->walDataReq.base.type == STRIGGER_PULL_WAL_DATA){
6,243✔
1693
    STREAM_CHECK_RET_GOTO(processWalVerDataVTable(pVnode, req->walDataReq.cids, req->walDataReq.ver,
1,533!
1694
      req->walDataReq.uid, &window, &pBlock));
1695
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_CALC_DATA){
4,710✔
1696
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
2,574!
1697
      req->walDataReq.uid, &window, sStreamReaderInfo->calcResBlock, &pBlock));
1698
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TRIGGER_DATA) {
2,136✔
1699
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, true,
1,942!
1700
      req->walDataReq.uid, &window, sStreamReaderInfo->triggerResBlock, &pBlock));
1701
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TS_DATA){
194!
1702
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
195!
1703
      req->walDataReq.uid, &window, sStreamReaderInfo->tsBlock, &pBlock));
1704

1705
  }
1706
  
1707
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
6,238✔
1708
  printDataBlock(pBlock, __func__, "");
6,238✔
1709
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
6,238!
1710
end:
6,231✔
1711
  STREAM_PRINT_LOG_END_WITHID(code, lino);
6,231!
1712
  SRpcMsg rsp = {
6,231✔
1713
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1714
  tmsgSendRsp(&rsp);
6,231✔
1715

1716
  blockDataDestroy(pBlock);
6,241✔
1717
  return code;
6,242✔
1718
}
1719

1720
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
531✔
1721
  int32_t code = 0;
531✔
1722
  int32_t lino = 0;
531✔
1723
  void*   buf = NULL;
531✔
1724
  size_t  size = 0;
531✔
1725

1726
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
531!
1727
  void* pTask = sStreamReaderInfo->pTask;
531✔
1728
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
531✔
1729

1730
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
531✔
1731
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
531!
1732
  SStreamGroupInfo pGroupInfo = {0};
531✔
1733
  pGroupInfo.gInfo = *gInfo;
531✔
1734

1735
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
531✔
1736
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1737
  buf = rpcMallocCont(size);
531✔
1738
  STREAM_CHECK_NULL_GOTO(buf, terrno);
531!
1739
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
531✔
1740
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1741
end:
531✔
1742
  if (code != 0) {
531!
1743
    rpcFreeCont(buf);
×
1744
    buf = NULL;
×
1745
    size = 0;
×
1746
  }
1747
  STREAM_PRINT_LOG_END_WITHID(code, lino);
531!
1748
  SRpcMsg rsp = {
531✔
1749
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1750
  tmsgSendRsp(&rsp);
531✔
1751

1752
  return code;
531✔
1753
}
1754

1755
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
112✔
1756
  int32_t              code = 0;
112✔
1757
  int32_t              lino = 0;
112✔
1758
  void*                buf = NULL;
112✔
1759
  size_t               size = 0;
112✔
1760
  SStreamMsgVTableInfo vTableInfo = {0};
112✔
1761
  SMetaReader          metaReader = {0};
112✔
1762
  SNodeList*           groupNew = NULL;
112✔
1763
  void*                pTableList = NULL;
112✔
1764
  SStorageAPI api = {0};
112✔
1765
  initStorageAPI(&api);
112✔
1766

1767
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
112!
1768
  void* pTask = sStreamReaderInfo->pTask;
112✔
1769
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
112✔
1770

1771
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
112!
1772
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
112!
1773
      pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1774
      groupNew, true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1775
      &pTableList, sStreamReaderInfo->groupIdMap));
1776

1777
  SArray* cids = req->virTableInfoReq.cids;
112✔
1778
  STREAM_CHECK_NULL_GOTO(cids, terrno);
112!
1779

1780
  SArray* pTableListArray = qStreamGetTableArrayList(pTableList);
112✔
1781
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
112!
1782

1783
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
112✔
1784
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
112!
1785
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
112✔
1786

1787
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
282✔
1788
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
170✔
1789
    if (pKeyInfo == NULL) {
170!
1790
      continue;
×
1791
    }
1792
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
170✔
1793
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
170!
1794
    vTable->uid = pKeyInfo->uid;
170✔
1795
    vTable->gId = pKeyInfo->groupId;
170✔
1796

1797
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
170✔
1798
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
169!
1799
      vTable->cols.nCols = metaReader.me.colRef.nCols;
×
1800
      vTable->cols.version = metaReader.me.colRef.version;
×
1801
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
×
1802
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
×
1803
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
×
1804
      }
1805
    } else {
1806
      vTable->cols.nCols = taosArrayGetSize(cids);
169✔
1807
      vTable->cols.version = metaReader.me.colRef.version;
169✔
1808
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
169!
1809
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
715✔
1810
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
1,860✔
1811
          if (metaReader.me.colRef.pColRef[j].hasRef &&
1,690✔
1812
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
1,145✔
1813
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
375✔
1814
            break;
375✔
1815
          }
1816
        }
1817
      }
1818
    }
1819
    tDecoderClear(&metaReader.coder);
170✔
1820
  }
1821
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
112✔
1822
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
112!
1823

1824
end:
112✔
1825
  nodesDestroyList(groupNew);
112✔
1826
  qStreamDestroyTableList(pTableList);
112✔
1827
  tDestroySStreamMsgVTableInfo(&vTableInfo);
112✔
1828
  api.metaReaderFn.clearReader(&metaReader);
112✔
1829
  STREAM_PRINT_LOG_END_WITHID(code, lino);
112!
1830
  SRpcMsg rsp = {
112✔
1831
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1832
  tmsgSendRsp(&rsp);
112✔
1833
  return code;
112✔
1834
}
1835

1836
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
47✔
1837
  int32_t                   code = 0;
47✔
1838
  int32_t                   lino = 0;
47✔
1839
  void*                     buf = NULL;
47✔
1840
  size_t                    size = 0;
47✔
1841
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
47✔
1842
  SMetaReader               metaReader = {0};
47✔
1843
  int64_t streamId = req->base.streamId;
47✔
1844
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
47✔
1845

1846
  SStorageAPI api = {0};
47✔
1847
  initStorageAPI(&api);
47✔
1848

1849
  SArray* cols = req->origTableInfoReq.cols;
47✔
1850
  STREAM_CHECK_NULL_GOTO(cols, terrno);
47!
1851

1852
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
47✔
1853

1854
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
47!
1855

1856
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
47✔
1857
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
196✔
1858
    OTableInfo*    oInfo = taosArrayGet(cols, i);
149✔
1859
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
149✔
1860
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
149!
1861
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
149!
1862
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
149!
1863
    vTableInfo->uid = metaReader.me.uid;
148✔
1864
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
148✔
1865

1866
    SSchemaWrapper* sSchemaWrapper = NULL;
149✔
1867
    if (metaReader.me.type == TD_CHILD_TABLE) {
149✔
1868
      int64_t suid = metaReader.me.ctbEntry.suid;
146✔
1869
      vTableInfo->suid = suid;
146✔
1870
      tDecoderClear(&metaReader.coder);
146✔
1871
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
146!
1872
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
146✔
1873
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
3!
1874
      vTableInfo->suid = 0;
3✔
1875
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
3✔
1876
    } else {
1877
      stError("invalid table type:%d", metaReader.me.type);
×
1878
    }
1879

1880
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
435!
1881
      SSchema* s = sSchemaWrapper->pSchema + j;
435✔
1882
      if (strcmp(s->name, oInfo->refColName) == 0) {
435✔
1883
        vTableInfo->cid = s->colId;
149✔
1884
        break;
149✔
1885
      }
1886
    }
1887
    if (vTableInfo->cid == 0) {
149!
1888
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
1889
              oInfo->refTableName);
1890
    }
1891
    tDecoderClear(&metaReader.coder);
149✔
1892
  }
1893

1894
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
47!
1895

1896
end:
47✔
1897
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
47✔
1898
  api.metaReaderFn.clearReader(&metaReader);
47✔
1899
  STREAM_PRINT_LOG_END(code, lino);
47!
1900
  SRpcMsg rsp = {
47✔
1901
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1902
  tmsgSendRsp(&rsp);
47✔
1903
  return code;
47✔
1904
}
1905

1906
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
381✔
1907
  int32_t                   code = 0;
381✔
1908
  int32_t                   lino = 0;
381✔
1909
  void*                     buf = NULL;
381✔
1910
  size_t                    size = 0;
381✔
1911
  SSDataBlock* pBlock = NULL;
381✔
1912

1913
  SMetaReader               metaReader = {0};
381✔
1914
  SMetaReader               metaReaderStable = {0};
381✔
1915
  int64_t streamId = req->base.streamId;
381✔
1916
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
381✔
1917

1918
  SStorageAPI api = {0};
381✔
1919
  initStorageAPI(&api);
381✔
1920

1921
  SArray* cols = req->virTablePseudoColReq.cids;
381✔
1922
  STREAM_CHECK_NULL_GOTO(cols, terrno);
381!
1923

1924
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
381✔
1925
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
381!
1926

1927
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
381!
1928

1929
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
381!
1930
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
381!
1931
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 && *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
×
1932
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
×
1933
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
1934
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
×
1935
    pBlock->info.rows = 1;
×
1936
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
×
1937
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
×
1938
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
×
1939
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
381!
1940
    int64_t suid = metaReader.me.ctbEntry.suid;
381✔
1941
    api.metaReaderFn.readerReleaseLock(&metaReader);
381✔
1942
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
381✔
1943

1944
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
381!
1945
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
380✔
1946
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
1,317✔
1947
      col_id_t* id = taosArrayGet(cols, i);
936✔
1948
      STREAM_CHECK_NULL_GOTO(id, terrno);
936!
1949
      if (*id == -1) {
936✔
1950
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
381✔
1951
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
381!
1952
        continue;
381✔
1953
      }
1954
      size_t j = 0;
555✔
1955
      for (; j < sSchemaWrapper->nCols; j++) {
1,080!
1956
        SSchema* s = sSchemaWrapper->pSchema + j;
1,080✔
1957
        if (s->colId == *id) {
1,080✔
1958
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
555✔
1959
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
556!
1960
          break;
556✔
1961
        }
1962
      }
1963
      if (j == sSchemaWrapper->nCols) {
556!
1964
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
1965
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
1966
      }
1967
    }
1968
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
381!
1969
    pBlock->info.rows = 1;
381✔
1970
    
1971
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
1,318✔
1972
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
936✔
1973
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
936!
1974

1975
      if (pDst->info.colId == -1) {
936✔
1976
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
381!
1977
        continue;
381✔
1978
      }
1979
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
555!
1980
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
1981
        continue;
×
1982
      }
1983

1984
      STagVal val = {0};
555✔
1985
      val.cid = pDst->info.colId;
555✔
1986
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
555✔
1987

1988
      char* data = NULL;
556✔
1989
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
556!
1990
        data = tTagValToData((const STagVal*)p, false);
521✔
1991
      } else {
1992
        data = (char*)p;
35✔
1993
      }
1994

1995
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
556!
1996
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
1997

1998
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
555!
1999
          (data != NULL)) {
2000
        taosMemoryFree(data);
105!
2001
      }
2002
    }
2003
  } else {
2004
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
2005
    code = TSDB_CODE_INVALID_PARA;
×
2006
    goto end;
×
2007
  }
2008
  
2009
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
381✔
2010
  printDataBlock(pBlock, __func__, "");
381✔
2011
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
380!
2012

2013
end:
380✔
2014
  api.metaReaderFn.clearReader(&metaReaderStable);
380✔
2015
  api.metaReaderFn.clearReader(&metaReader);
381✔
2016
  STREAM_PRINT_LOG_END(code, lino);
381!
2017
  SRpcMsg rsp = {
381✔
2018
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2019
  tmsgSendRsp(&rsp);
381✔
2020
  blockDataDestroy(pBlock);
381✔
2021
  return code;
381✔
2022
}
2023

2024
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
15,493✔
2025
  int32_t            code = 0;
15,493✔
2026
  int32_t            lino = 0;
15,493✔
2027
  void*              buf = NULL;
15,493✔
2028
  size_t             size = 0;
15,493✔
2029
  void*              taskAddr = NULL;
15,493✔
2030
  SArray*            pResList = NULL;
15,493✔
2031

2032
  SResFetchReq req = {0};
15,493✔
2033
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
15,493!
2034
                              TSDB_CODE_QRY_INVALID_INPUT);
2035
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
15,493✔
2036
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
15,493!
2037

2038
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
15,493!
2039
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
15,493✔
2040
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
15,493!
2041
  void* pTask = sStreamReaderCalcInfo->pTask;
15,493✔
2042
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
15,493✔
2043
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
2044

2045
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
15,493!
2046
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
15,409✔
2047
    int64_t uid = 0;
15,409✔
2048
    if (req.dynTbname) {
15,409✔
2049
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
6✔
2050
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
6!
2051
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
6✔
2052
        if (pValue != NULL && pValue->isTbname) {
6!
2053
          uid = pValue->uid;
6✔
2054
          break;
6✔
2055
        }
2056
      }
2057
    }
2058
    
2059
    SReadHandle handle = {0};
15,409✔
2060
    handle.vnode = pVnode;
15,409✔
2061
    handle.uid = uid;
15,409✔
2062

2063
    initStorageAPI(&handle.api);
15,409✔
2064
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
15,409✔
2065
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
15,063!
2066
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
15,409✔
2067
      if (node != NULL) {
15,409✔
2068
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle));
15,386!
2069
      }
2070
    }
2071

2072
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
15,409✔
2073
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
15,409✔
2074

2075
    // if (sStreamReaderCalcInfo->pTaskInfo == NULL) {
2076
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
15,409!
2077
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
2078
                                                    req.taskId));
2079
    // } else {
2080
    // STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, handle.winRange));
2081
    // }
2082

2083
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
15,409!
2084
  }
2085

2086
  if (req.pOpParam != NULL) {
15,493!
2087
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
×
2088
  }
2089
  
2090
  pResList = taosArrayInit(4, POINTER_BYTES);
15,493✔
2091
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
15,493!
2092
  uint64_t ts = 0;
15,493✔
2093
  bool     hasNext = false;
15,493✔
2094
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
15,493!
2095

2096
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
25,071✔
2097
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
9,578✔
2098
    if (pBlock == NULL) continue;
9,578!
2099
    printDataBlock(pBlock, __func__, "fetch");
9,578✔
2100
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
9,578✔
2101
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo));
350!
2102
      printDataBlock(pBlock, __func__, "fetch filter");
350✔
2103
    }
2104
  }
2105

2106
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
15,493!
2107
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
15,493✔
2108

2109
end:
16✔
2110
  taosArrayDestroy(pResList);
15,493✔
2111
  streamReleaseTask(taskAddr);
15,493✔
2112

2113
  STREAM_PRINT_LOG_END(code, lino);
15,493!
2114
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
15,493✔
2115
  tmsgSendRsp(&rsp);
15,493✔
2116
  tDestroySResFetchReq(&req);
15,493✔
2117
  return code;
15,493✔
2118
}
2119

2120
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
59,860✔
2121
  int32_t                   code = 0;
59,860✔
2122
  int32_t                   lino = 0;
59,860✔
2123
  SSTriggerPullRequestUnion req = {0};
59,860✔
2124
  void*                     taskAddr = NULL;
59,860✔
2125

2126
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
59,860✔
2127
  if (!syncIsReadyForRead(pVnode->sync)) {
59,864✔
2128
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
142✔
2129
    return 0;
149✔
2130
  }
2131

2132
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
59,748✔
2133
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
15,493✔
2134
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
44,255✔
2135
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
44,248✔
2136
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
44,248✔
2137
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
44,248!
2138
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
44,230✔
2139
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
2140
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
44,235✔
2141
    switch (req.base.type) {
44,232!
2142
      case STRIGGER_PULL_SET_TABLE:
47✔
2143
        code = vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo);
47✔
2144
        break;
47✔
2145
      case STRIGGER_PULL_LAST_TS:
316✔
2146
        code = vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
316✔
2147
        break;
316✔
2148
      case STRIGGER_PULL_FIRST_TS:
319✔
2149
        code = vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
319✔
2150
        break;
319✔
2151
      case STRIGGER_PULL_TSDB_META:
336✔
2152
      case STRIGGER_PULL_TSDB_META_NEXT:
2153
        code = vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
336✔
2154
        break;
336✔
2155
      case STRIGGER_PULL_TSDB_TS_DATA:
37✔
2156
        code = vnodeProcessStreamTsdbTsDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
37✔
2157
        break;
37✔
2158
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
336✔
2159
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
2160
        code = vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
336✔
2161
        break;
336✔
2162
      case STRIGGER_PULL_TSDB_CALC_DATA:
28,394✔
2163
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
2164
        code = vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
28,394✔
2165
        break;
28,392✔
2166
      case STRIGGER_PULL_TSDB_DATA:
144✔
2167
      case STRIGGER_PULL_TSDB_DATA_NEXT:
2168
        code = vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
144✔
2169
        break;
144✔
2170
      case STRIGGER_PULL_WAL_META:
6,988✔
2171
        code = vnodeProcessStreamWalMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
6,988✔
2172
        break;
7,012✔
2173
      case STRIGGER_PULL_WAL_TS_DATA:
6,245✔
2174
      case STRIGGER_PULL_WAL_TRIGGER_DATA:
2175
      case STRIGGER_PULL_WAL_CALC_DATA:
2176
      case STRIGGER_PULL_WAL_DATA:
2177
        code = vnodeProcessStreamWalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
6,245✔
2178
        break;
6,242✔
2179
      case STRIGGER_PULL_GROUP_COL_VALUE:
531✔
2180
        code = vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo);
531✔
2181
        break;
531✔
2182
      case STRIGGER_PULL_VTABLE_INFO:
112✔
2183
        code = vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo);
112✔
2184
        break;
112✔
2185
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
380✔
2186
        code = vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req);
380✔
2187
        break;
381✔
2188
      case STRIGGER_PULL_OTABLE_INFO:
47✔
2189
        code = vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req);
47✔
2190
        break;
47✔
2191
      default:
×
2192
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
2193
        code = TSDB_CODE_APP_ERROR;
×
2194
        break;
×
2195
    }
2196
  } else {
2197
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
7!
2198
    code = TSDB_CODE_APP_ERROR;
×
2199
  }
2200
end:
44,252✔
2201

2202
  streamReleaseTask(taskAddr);
44,252✔
2203

2204
  tDestroySTriggerPullRequest(&req);
44,258✔
2205
  STREAM_PRINT_LOG_END(code, lino);
44,249!
2206
  return code;
44,253✔
2207
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc