• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4754

25 Sep 2025 05:58AM UTC coverage: 57.946% (-1.0%) from 58.977%
#4754

push

travis-ci

web-flow
enh: taos command line support '-uroot' on windows (#33055)

133189 of 293169 branches covered (45.43%)

Branch coverage included in aggregate %.

201677 of 284720 relevant lines covered (70.83%)

5398749.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.64
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "scalar.h"
17
#include "streamReader.h"
18
#include "tdatablock.h"
19
#include "tdb.h"
20
#include "tencode.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23
#include "vnd.h"
24
#include "vnode.h"
25
#include "vnodeInt.h"
26

27
#define BUILD_OPTION(options, sStreamInfo, _ver, _groupSort, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
28
                     _gid, _initReader, _uidList)                                                                        \
29
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_uidList == NULL ? sStreamInfo->suid : 0),              \
30
                                                  .uid = sStreamInfo->uid,                                         \
31
                                                  .ver = _ver,                                                     \
32
                                                  .tableType = sStreamInfo->tableType,                             \
33
                                                  .groupSort = _groupSort,                                         \
34
                                                  .order = _order,                                                 \
35
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
36
                                                  .pTagCond = sStreamInfo->pTagCond,                               \
37
                                                  .pTagIndexCond = sStreamInfo->pTagIndexCond,                     \
38
                                                  .pConditions = sStreamInfo->pConditions,                         \
39
                                                  .partitionCols = sStreamInfo->partitionCols,                     \
40
                                                  .schemas = _schemas,                                             \
41
                                                  .isSchema = _isSchema,                                           \
42
                                                  .scanMode = _scanMode,                                           \
43
                                                  .gid = _gid,                                                     \
44
                                                  .initReader = _initReader,                                       \
45
                                                  .uidList = _uidList};
46

47
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
29,139✔
48

49
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
50

51
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
30,976✔
52
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
30,976✔
53
  if (pSrc == NULL) {
30,999!
54
    return terrno;
×
55
  }
56

57
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
31,027✔
58
  return 0;
31,027✔
59
}
60

61
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
33,638✔
62
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
33,638✔
63
  if (code != TSDB_CODE_SUCCESS) {
33,635!
64
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
65
  }
66

67
  return code;
33,635✔
68
}
69

70
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
2,856✔
71
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
2,856✔
72
}
73

74
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
60✔
75
  int32_t code = 0;
60✔
76
  int32_t lino = 0;
60✔
77
  void*   buf = NULL;
60✔
78
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
60✔
79
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
60!
80
  buf = rpcMallocCont(len);
60✔
81
  STREAM_CHECK_NULL_GOTO(buf, terrno);
60!
82
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
60✔
83
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
60!
84
  *data = buf;
60✔
85
  *size = len;
60✔
86
  buf = NULL;
60✔
87
end:
60✔
88
  rpcFreeCont(buf);
60✔
89
  return code;
60✔
90
}
91

92
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
192✔
93
  int32_t code = 0;
192✔
94
  int32_t lino = 0;
192✔
95
  void*   buf = NULL;
192✔
96
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
192✔
97
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
192!
98
  buf = rpcMallocCont(len);
192✔
99
  STREAM_CHECK_NULL_GOTO(buf, terrno);
192!
100
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
192✔
101
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
192!
102
  *data = buf;
192✔
103
  *size = len;
192✔
104
  buf = NULL;
192✔
105
end:
192✔
106
  rpcFreeCont(buf);
192✔
107
  return code;
192✔
108
}
109

110
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
708✔
111
  int32_t code = 0;
708✔
112
  int32_t lino = 0;
708✔
113
  void*   buf = NULL;
708✔
114
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
708✔
115
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
707!
116
  buf = rpcMallocCont(len);
707✔
117
  STREAM_CHECK_NULL_GOTO(buf, terrno);
708!
118
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
708✔
119
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
708!
120
  *data = buf;
708✔
121
  *size = len;
708✔
122
  buf = NULL;
708✔
123
end:
708✔
124
  rpcFreeCont(buf);
708✔
125
  return code;
708✔
126
}
127

128

129
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
45,736✔
130
  int32_t code = 0;
45,736✔
131
  int32_t lino = 0;
45,736✔
132
  void*   buf = NULL;
45,736✔
133
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
45,736!
134
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
10,226✔
135
  buf = rpcMallocCont(dataEncodeSize);
10,219✔
136
  STREAM_CHECK_NULL_GOTO(buf, terrno);
10,225!
137
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
10,225✔
138
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
10,220!
139
  *data = buf;
10,220✔
140
  *size = dataEncodeSize;
10,220✔
141
  buf = NULL;
10,220✔
142
end:
45,730✔
143
  rpcFreeCont(buf);
45,730✔
144
  return code;
45,731✔
145
}
146

147
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
660✔
148
  int32_t        pNum = 1;
660✔
149
  STableKeyInfo* pList = NULL;
660✔
150
  int32_t        code = 0;
660✔
151
  int32_t        lino = 0;
660✔
152
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
660!
153
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
660!
154

155
  cleanupQueryTableDataCond(&pTask->cond);
660✔
156
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
660!
157
                                                      pTask->options.twindows, pTask->options.suid, pTask->options.ver));
158
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
660!
159

160
end:
660✔
161
  STREAM_PRINT_LOG_END(code, lino);
660!
162
  return code;
660✔
163
}
164

165
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t gid, bool isVTable, int64_t uid,
4,762✔
166
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
167
  int32_t code = 0;
4,762✔
168
  int32_t lino = 0;
4,762✔
169
  int32_t index = 0;
4,762✔
170
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
4,762!
171
  if (!isVTable) {
4,742✔
172
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &gid));
3,237!
173
  }
174
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
4,719!
175
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
4,683!
176
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
4,670!
177
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
4,665!
178
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
4,658!
179

180
end:
4,665✔
181
  STREAM_PRINT_LOG_END(code, lino)
4,665!
182
  return code;
4,673✔
183
}
184

185
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
4,775✔
186
  pTSchema->numOfCols = 1;
4,775✔
187
  pTSchema->version = ver;
4,775✔
188
  pTSchema->columns[0].colId = colId;
4,775✔
189
  pTSchema->columns[0].type = type;
4,775✔
190
  pTSchema->columns[0].bytes = bytes;
4,775✔
191
}
4,775✔
192

193
int32_t retrieveWalMetaData(SSubmitTbData* pSubmitTbData, void* pTableList, bool isVTable, SSDataBlock* pBlock,
39,983✔
194
                            int64_t ver) {
195
  int32_t code = 0;
39,983✔
196
  int32_t lino = 0;
39,983✔
197

198
  int64_t   uid = pSubmitTbData->uid;
39,983✔
199
  int32_t   numOfRows = 0;
39,983✔
200
  int64_t   skey = 0;
39,983✔
201
  int64_t   ekey = 0;
39,983✔
202
  STSchema* pTSchema = NULL;
39,983✔
203
  uint64_t  gid = 0;
39,983✔
204
  if (isVTable) {
39,983✔
205
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS);
5,406✔
206
  } else {
207
    STREAM_CHECK_CONDITION_GOTO(!qStreamUidInTableList(pTableList, uid), TDB_CODE_SUCCESS);
34,577✔
208
    gid = qStreamGetGroupId(pTableList, uid);
3,166✔
209
  }
210

211
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
4,740!
212
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
213
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
214
    numOfRows = pCol->nVal;
×
215

216
    SColVal colVal = {0};
×
217
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, 0, &colVal));
×
218
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
219

220
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, numOfRows - 1, &colVal));
×
221
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
222
  } else {
223
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
4,740✔
224
    SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, 0);
4,724✔
225
    STREAM_CHECK_NULL_GOTO(pRow, terrno);
4,687!
226
    SColVal colVal = {0};
4,687✔
227
    pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn));
4,687!
228
    STREAM_CHECK_NULL_GOTO(pTSchema, terrno);
4,796!
229
    buildTSchema(pTSchema, pSubmitTbData->sver, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES);
4,796✔
230
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
4,765!
231
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
4,732✔
232
    pRow = taosArrayGetP(pSubmitTbData->aRowP, numOfRows - 1);
4,732✔
233
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
4,695!
234
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
4,688✔
235
  }
236

237
  STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_SUBMIT_DATA, gid, isVTable, uid, skey, ekey, ver, numOfRows));
4,688!
238
  pBlock->info.rows++;
4,604✔
239
  stDebug("stream reader scan submit data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64 ", gid %" PRIu64
4,604✔
240
          ", rows:%d",
241
          uid, skey, ekey, gid, numOfRows);
242

243
end:
2,856✔
244
  taosMemoryFree(pTSchema);
40,089!
245
  STREAM_PRINT_LOG_END(code, lino)
40,223!
246
  return code;
40,227✔
247
}
248

249
int32_t retrieveWalData(SVnode* pVnode, SSubmitTbData* pSubmitTbData, SSDataBlock* pBlock, STimeWindow* window) {
7,689✔
250
  stDebug("stream reader retrieve data block %p", pSubmitTbData);
7,689✔
251
  int32_t code = 0;
7,689✔
252
  int32_t lino = 0;
7,689✔
253

254
  int32_t ver = pSubmitTbData->sver;
7,689✔
255
  int64_t uid = pSubmitTbData->uid;
7,689✔
256
  int32_t numOfRows = 0;
7,689✔
257

258
  STSchema* schemas = metaGetTbTSchema(pVnode->pMeta, uid, ver, 1);
7,689✔
259
  STREAM_CHECK_NULL_GOTO(schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
7,701!
260
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
7,701!
261
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
262
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
263
    int32_t rowStart = 0;
×
264
    int32_t rowEnd = pCol->nVal;
×
265
    if (window != NULL) {
×
266
      SColVal colVal = {0};
×
267
      rowStart = -1;
×
268
      rowEnd = -1;
×
269
      for (int32_t k = 0; k < pCol->nVal; k++) {
×
270
        STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
271
        int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
272
        if (ts >= window->skey && rowStart == -1) {
×
273
          rowStart = k;
×
274
        }
275
        if (ts > window->ekey && rowEnd == -1) {
×
276
          rowEnd = k;
×
277
        }
278
      }
279
      STREAM_CHECK_CONDITION_GOTO(rowStart == -1 || rowStart == rowEnd, TDB_CODE_SUCCESS);
×
280

281
      if (rowStart != -1 && rowEnd == -1) {
×
282
        rowEnd = pCol->nVal;
×
283
      }
284
    }
285
    numOfRows = rowEnd - rowStart;
×
286
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
×
287
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
288
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
289
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
290
      if (i >= taosArrayGetSize(pSubmitTbData->aCol)) {
×
291
        break;
×
292
      }
293
      for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aCol); j++) {
×
294
        SColData* pCol = taosArrayGet(pSubmitTbData->aCol, j);
×
295
        STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
296
        if (pCol->cid == pColData->info.colId) {
×
297
          for (int32_t k = rowStart; k < rowEnd; k++) {
×
298
            SColVal colVal = {0};
×
299
            STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
300
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
301
                                                !COL_VAL_IS_VALUE(&colVal)));
302
          }
303
        }
304
      }
305
    }
306
  } else {
307
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, taosArrayGetSize(pSubmitTbData->aRowP)));
7,701!
308
    for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aRowP); j++) {
523,938✔
309
      SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, j);
516,244✔
310
      STREAM_CHECK_NULL_GOTO(pRow, terrno);
516,249!
311
      SColVal colVal = {0};
516,249✔
312
      STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, 0, &colVal));
516,249!
313
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
516,251✔
314
      if (window != NULL && (ts < window->skey || ts > window->ekey)) {
516,251✔
315
        continue;
45✔
316
      }
317
      for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
1,561,186✔
318
        if (i >= schemas->numOfCols) {
1,558,959✔
319
          break;
513,957✔
320
        }
321
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
1,045,002✔
322
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
1,044,999✔
323
        SColVal colVal = {0};
1,044,997✔
324
        int32_t sourceIdx = 0;
1,044,997✔
325
        while (1) {
326
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, sourceIdx, &colVal));
1,658,051!
327
          if (colVal.cid < pColData->info.colId) {
1,658,035✔
328
            sourceIdx++;
613,054✔
329
            continue;
613,054✔
330
          } else {
331
            break;
1,044,981✔
332
          }
333
        }
334
        if (colVal.cid == pColData->info.colId && COL_VAL_IS_VALUE(&colVal)) {
1,044,981✔
335
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
1,038,061!
336
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, numOfRows, colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
663!
337
          } else {
338
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
1,037,398!
339
          }
340
        } else {
341
          colDataSetNULL(pColData, numOfRows);
6,920!
342
        }
343
      }
344
      numOfRows++;
516,195✔
345
    }
346
  }
347

348
  pBlock->info.rows = numOfRows;
7,703✔
349

350
end:
7,703✔
351
  taosMemoryFree(schemas);
7,703!
352
  STREAM_PRINT_LOG_END(code, lino)
7,704!
353
  return code;
7,704✔
354
}
355

356
static int32_t buildDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, SDeleteRes* req, int64_t uid, int64_t ver){
136✔
357
  int32_t    code = 0;
136✔
358
  int32_t    lino = 0;
136✔
359
  uint64_t   gid = 0;
136✔
360
  stDebug("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
136✔
361
  if (isVTable) {
136✔
362
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS)
33✔
363
  } else {
364
    gid = qStreamGetGroupId(pTableList, uid);
103✔
365
    STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
103✔
366
  }
367
  STREAM_CHECK_RET_GOTO(
69!
368
      buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, uid, req->skey, req->ekey, ver, 1));
369
  pBlock->info.rows++;
69✔
370

371
  stDebug("stream reader scan delete end data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
69✔
372
end:
48✔
373
  return code;
136✔
374
}
375

376
static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
136✔
377
                              int64_t ver) {
378
  int32_t    code = 0;
136✔
379
  int32_t    lino = 0;
136✔
380
  SDecoder   decoder = {0};
136✔
381
  SDeleteRes req = {0};
136✔
382
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
136✔
383
  tDecoderInit(&decoder, data, len);
136✔
384
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
136!
385
  
386
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
272✔
387
    uint64_t* uid = taosArrayGet(req.uidList, i);
136✔
388
    STREAM_CHECK_NULL_GOTO(uid, terrno);
136!
389
    STREAM_CHECK_RET_GOTO(buildDeleteData(pTableList, isVTable, pBlock, &req, *uid, ver));
136!
390
  }
391

392
end:
136✔
393
  taosArrayDestroy(req.uidList);
136✔
394
  tDecoderClear(&decoder);
136✔
395
  return code;
136✔
396
}
397

398
static int32_t scanDropTable(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
×
399
                             int64_t ver) {
400
  int32_t  code = 0;
×
401
  int32_t  lino = 0;
×
402
  SDecoder decoder = {0};
×
403

404
  SVDropTbBatchReq req = {0};
×
405
  tDecoderInit(&decoder, data, len);
×
406
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
×
407
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
408
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
409
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
×
410
    uint64_t gid = 0;
×
411
    if (isVTable) {
×
412
      if (taosHashGet(pTableList, &pDropTbReq->uid, sizeof(pDropTbReq->uid)) == NULL) {
×
413
        continue;
×
414
      }
415
    } else {
416
      gid = qStreamGetGroupId(pTableList, pDropTbReq->uid);
×
417
      if (gid == -1) continue;
×
418
    }
419

420
    STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_DELETE_TABLE, gid, isVTable, pDropTbReq->uid, 0, 0, ver, 1));
×
421
    pBlock->info.rows++;
×
422
    stDebug("stream reader scan drop :uid %" PRIu64 ", gid %" PRIu64, pDropTbReq->uid, gid);
×
423
  }
424

425
end:
×
426
  tDecoderClear(&decoder);
×
427
  return code;
×
428
}
429

430
static int32_t scanSubmitData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
40,167✔
431
                              int64_t ver) {
432
  int32_t  code = 0;
40,167✔
433
  int32_t  lino = 0;
40,167✔
434
  SDecoder decoder = {0};
40,167✔
435

436
  SSubmitReq2 submit = {0};
40,167✔
437
  tDecoderInit(&decoder, data, len);
40,167✔
438
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
40,097!
439

440
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
40,056✔
441

442
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfBlocks));
40,046!
443

444
  int32_t nextBlk = -1;
40,030✔
445
  while (++nextBlk < numOfBlocks) {
80,251✔
446
    stDebug("stream reader scan submit, next data block %d/%d", nextBlk, numOfBlocks);
40,025✔
447
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
40,025✔
448
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
39,993!
449
    STREAM_CHECK_RET_GOTO(retrieveWalMetaData(pSubmitTbData, pTableList, isVTable, pBlock, ver));
39,993!
450
  }
451
end:
40,226✔
452
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
40,226✔
453
  tDecoderClear(&decoder);
40,167✔
454
  return code;
40,221✔
455
}
456

457
static int32_t scanWal(SVnode* pVnode, void* pTableList, bool isVTable, SSDataBlock* pBlock, int64_t lastVer,
8,050✔
458
                       int8_t deleteData, int8_t deleteTb, int64_t ctime, int64_t* retVer) {
459
  int32_t code = 0;
8,050✔
460
  int32_t lino = 0;
8,050✔
461

462
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
8,050✔
463
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
8,044!
464
  *retVer = walGetAppliedVer(pWalReader->pWal);
8,044✔
465
  STREAM_CHECK_CONDITION_GOTO(walReaderSeekVer(pWalReader, lastVer + 1) != 0, TSDB_CODE_SUCCESS);
8,057✔
466

467
  while (1) {
40,846✔
468
    *retVer = walGetAppliedVer(pWalReader->pWal);
42,307✔
469
    STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader, true) < 0, TSDB_CODE_SUCCESS);
42,288✔
470

471
    SWalCont* wCont = &pWalReader->pHead->head;
40,831✔
472
    if (wCont->ingestTs / 1000 > ctime) break;
40,831!
473
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
40,831✔
474
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
40,831✔
475
    int64_t ver = wCont->version;
40,831✔
476

477
    stDebug("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
40,831✔
478
      TD_VID(pVnode), ver, wCont->msgType, deleteData, deleteTb);
479
    if (wCont->msgType == TDMT_VND_DELETE && deleteData != 0) {
40,827✔
480
      STREAM_CHECK_RET_GOTO(scanDeleteData(pTableList, isVTable, pBlock, data, len, ver));
136!
481
    } else if (wCont->msgType == TDMT_VND_DROP_TABLE && deleteTb != 0) {
40,691!
482
      STREAM_CHECK_RET_GOTO(scanDropTable(pTableList, isVTable, pBlock, data, len, ver));
×
483
    } else if (wCont->msgType == TDMT_VND_SUBMIT) {
40,691✔
484
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
40,197✔
485
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
40,197✔
486
      STREAM_CHECK_RET_GOTO(scanSubmitData(pTableList, isVTable, pBlock, data, len, ver));
40,197!
487
    }
488

489
    if (pBlock->info.rows >= STREAM_RETURN_ROWS_NUM) {
40,846!
490
      break;
×
491
    }
492
  }
493

494
end:
8,050✔
495
  walCloseReader(pWalReader);
8,050✔
496
  STREAM_PRINT_LOG_END(code, lino);
8,056!
497
  return code;
8,064✔
498
}
499

500
int32_t scanWalOneVer(SVnode* pVnode, SSDataBlock* pBlock, SSDataBlock* pBlockRet,
7,703✔
501
                      int64_t ver, int64_t uid, STimeWindow* window) {
502
  int32_t     code = 0;
7,703✔
503
  int32_t     lino = 0;
7,703✔
504
  SSubmitReq2 submit = {0};
7,703✔
505
  SDecoder    decoder = {0};
7,703✔
506

507
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
7,703✔
508
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
7,701!
509

510
  STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, ver));
7,701!
511
  STREAM_CHECK_CONDITION_GOTO(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT, TSDB_CODE_STREAM_WAL_VER_NOT_DATA);
7,695!
512
  STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
7,695!
513
  void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
7,703✔
514
  int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
7,703✔
515

516
  int32_t nextBlk = -1;
7,703✔
517
  tDecoderInit(&decoder, pBody, bodyLen);
7,703✔
518
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
7,701!
519

520
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
7,703✔
521
  while (++nextBlk < numOfBlocks) {
15,393✔
522
    stDebug("stream reader next data block %d/%d", nextBlk, numOfBlocks);
7,694✔
523
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
7,694✔
524
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
7,694✔
525
    if (pSubmitTbData->uid != uid) {
7,691!
526
      stDebug("stream reader skip data block uid:%" PRId64, pSubmitTbData->uid);
×
527
      continue;
×
528
    }
529
    STREAM_CHECK_RET_GOTO(retrieveWalData(pVnode, pSubmitTbData, pBlock, window));
7,691!
530
    printDataBlock(pBlock, __func__, "");
7,698✔
531

532
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRet, pBlock));
7,694!
533
    blockDataCleanup(pBlock);
7,698✔
534
  }
535

536
end:
7,699✔
537
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
7,699✔
538
  walCloseReader(pWalReader);
7,705✔
539
  tDecoderClear(&decoder);
7,710✔
540
  STREAM_PRINT_LOG_END(code, lino);
7,710!
541
  return code;
7,709✔
542
}
543

544
static int32_t processTag(SVnode* pVnode, SExprInfo* pExpr, int32_t numOfExpr, SStorageAPI* api, SSDataBlock* pBlock) {
5,671✔
545
  int32_t     code = 0;
5,671✔
546
  int32_t     lino = 0;
5,671✔
547
  SMetaReader mr = {0};
5,671✔
548

549
  if (numOfExpr == 0) {
5,671!
550
    return TSDB_CODE_SUCCESS;
×
551
  }
552
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
5,671✔
553
  code = api->metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
5,675✔
554
  if (code != TSDB_CODE_SUCCESS) {
5,658!
555
    stError("failed to get table meta, uid:%" PRId64 ", code:%s", pBlock->info.id.uid, tstrerror(code));
×
556
  }
557
  api->metaReaderFn.readerReleaseLock(&mr);
5,658✔
558
  for (int32_t j = 0; j < numOfExpr; ++j) {
32,484✔
559
    const SExprInfo* pExpr1 = &pExpr[j];
26,820✔
560
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
26,820✔
561

562
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
26,820✔
563
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
26,809✔
564
    if (mr.me.name == NULL) {
26,808!
565
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
566
      continue;
×
567
    }
568
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
26,808✔
569

570
    int32_t functionId = pExpr1->pExpr->_function.functionId;
26,816✔
571

572
    // this is to handle the tbname
573
    if (fmIsScanPseudoColumnFunc(functionId)) {
26,816✔
574
      int32_t fType = pExpr1->pExpr->_function.functionType;
5,667✔
575
      if (fType == FUNCTION_TYPE_TBNAME) {
5,667!
576
        STREAM_CHECK_RET_GOTO(setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name));
5,671!
577
        pColInfoData->info.colId = -1;
5,660✔
578
      }
579
    } else {  // these are tags
580
      STagVal tagVal = {0};
21,149✔
581
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
21,149✔
582
      const char* p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
21,149✔
583

584
      char* data = NULL;
21,150✔
585
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
21,150✔
586
        data = tTagValToData((const STagVal*)p, false);
12,237✔
587
      } else {
588
        data = (char*)p;
8,913✔
589
      }
590

591
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
21,150!
592
      if (isNullVal) {
21,150✔
593
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
8,908✔
594
      } else {
595
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
12,242✔
596
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
12,243!
597
          taosMemoryFree(data);
7,127!
598
        }
599
        STREAM_CHECK_RET_GOTO(code);
12,253!
600
      }
601
    }
602
  }
603

604
end:
5,664✔
605
  api->metaReaderFn.clearReader(&mr);
5,664✔
606

607
  STREAM_PRINT_LOG_END(code, lino);
5,666!
608
  return code;
5,666✔
609
}
610

611
static int32_t processWalVerData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamInfo, int64_t ver, bool isTrigger,
5,450✔
612
                                 int64_t uid, STimeWindow* window, SSDataBlock* pSrcBlock, SSDataBlock** pBlock) {
613
  int32_t      code = 0;
5,450✔
614
  int32_t      lino = 0;
5,450✔
615
  SFilterInfo* pFilterInfo = NULL;
5,450✔
616
  SSDataBlock* pBlock1 = NULL;
5,450✔
617
  SSDataBlock* pBlock2 = NULL;
5,450✔
618

619
  SExprInfo*   pExpr = sStreamInfo->pExprInfo;
5,450✔
620
  int32_t      numOfExpr = sStreamInfo->numOfExpr;
5,450✔
621

622
  STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamInfo->pConditions, &pFilterInfo, 0, NULL));
5,450!
623

624
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock1));
5,452!
625
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock2));
5,454!
626
  if (!isTrigger) STREAM_CHECK_RET_GOTO(createOneDataBlock(pSrcBlock, false, pBlock));
5,450!
627

628
  pBlock2->info.id.uid = uid;
5,451✔
629
  pBlock1->info.id.uid = uid;
5,451✔
630

631
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
5,451!
632

633
  if (pBlock2->info.rows > 0) {
5,453✔
634
    SStorageAPI  api = {0};
5,452✔
635
    initStorageAPI(&api);
5,452✔
636
    STREAM_CHECK_RET_GOTO(processTag(pVnode, pExpr, numOfExpr, &api, pBlock2));
5,452!
637
  }
638
  STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock2, pFilterInfo));
5,441!
639
  if (!isTrigger) {
5,453✔
640
    blockDataTransform(*pBlock, pBlock2);
3,490✔
641
  } else {
642
    *pBlock = pBlock2;
1,963✔
643
    pBlock2 = NULL;  
1,963✔
644
  }
645

646
  printDataBlock(*pBlock, __func__, "processWalVerData2");
5,454✔
647

648
end:
5,447✔
649
  STREAM_PRINT_LOG_END(code, lino);
5,447!
650
  filterFreeInfo(pFilterInfo);
5,447✔
651
  blockDataDestroy(pBlock1);
5,455✔
652
  blockDataDestroy(pBlock2);
5,452✔
653
  return code;
5,453✔
654
}
655

656
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
2,523✔
657
  int32_t code = 0;
2,523✔
658
  int32_t lino = 0;
2,523✔
659
  SMetaReader metaReader = {0};
2,523✔
660
  SStorageAPI api = {0};
2,523✔
661
  initStorageAPI(&api);
2,523✔
662
  *schemas = taosArrayInit(8, sizeof(SSchema));
2,525✔
663
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
2,524!
664
  
665
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
2,524✔
666
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
2,525!
667

668
  SSchemaWrapper* sSchemaWrapper = NULL;
2,520✔
669
  if (metaReader.me.type == TD_CHILD_TABLE) {
2,520✔
670
    int64_t suid = metaReader.me.ctbEntry.suid;
2,369✔
671
    tDecoderClear(&metaReader.coder);
2,369✔
672
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
2,372!
673
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
2,372✔
674
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
151!
675
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
151✔
676
  } else {
677
    qError("invalid table type:%d", metaReader.me.type);
×
678
  }
679

680
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
17,065✔
681
    SSchema* s = sSchemaWrapper->pSchema + j;
14,541✔
682
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
29,085!
683
  }
684

685
end:
2,524✔
686
  api.metaReaderFn.clearReader(&metaReader);
2,524✔
687
  STREAM_PRINT_LOG_END(code, lino);
2,525!
688
  if (code != 0)  taosArrayDestroy(*schemas);
2,525!
689
  return code;
2,525✔
690
}
691

692
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
2,524✔
693
  int32_t code = 0;
2,524✔
694
  int32_t lino = 0;
2,524✔
695
  size_t  schemaLen = taosArrayGetSize(schemas);
2,524✔
696
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
2,522!
697
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
9,039✔
698
    col_id_t* id = taosArrayGet(cols, i);
6,514✔
699
    STREAM_CHECK_NULL_GOTO(id, terrno);
6,512!
700
    for (size_t i = 0; i < schemaLen; i++) {
17,971✔
701
      SSchema* s = taosArrayGet(schemas, i);
17,969✔
702
      STREAM_CHECK_NULL_GOTO(s, terrno);
17,968!
703
      if (*id == s->colId) {
17,973✔
704
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
6,512!
705
        break;
6,512✔
706
      }
707
    }
708
  }
709
  taosArrayPopFrontBatch(schemas, schemaLen);
2,519✔
710

711
end:
2,522✔
712
  return code;
2,522✔
713
}
714

715
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
2,255✔
716
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
717
  int32_t      code = 0;
2,255✔
718
  int32_t      lino = 0;
2,255✔
719
  SArray*      schemas = NULL;
2,255✔
720

721
  SSDataBlock* pBlock1 = NULL;
2,255✔
722
  SSDataBlock* pBlock2 = NULL;
2,255✔
723

724
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
2,255!
725
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
2,254!
726
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock1));
2,254!
727
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
2,256!
728

729
  pBlock2->info.id.uid = uid;
2,256✔
730
  pBlock1->info.id.uid = uid;
2,256✔
731

732
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
2,256!
733
  printDataBlock(pBlock2, __func__, "");
2,257✔
734

735
  *pBlock = pBlock2;
2,257✔
736
  pBlock2 = NULL;
2,257✔
737

738
end:
2,257✔
739
  STREAM_PRINT_LOG_END(code, lino);
2,257!
740
  blockDataDestroy(pBlock1);
2,257✔
741
  blockDataDestroy(pBlock2);
2,256✔
742
  taosArrayDestroy(schemas);
2,256✔
743
  return code;
2,257✔
744
}
745

746
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
263,143✔
747
                                    STargetNode* pTargetNodeTs) {
748
  int32_t code = 0;
263,143✔
749
  int32_t lino = 0;
263,143✔
750

751
  SColumnNode*         pCol = NULL;
263,143✔
752
  SColumnNode*         pCol1 = NULL;
263,143✔
753
  SValueNode*          pVal = NULL;
263,143✔
754
  SValueNode*          pVal1 = NULL;
263,143✔
755
  SOperatorNode*       op = NULL;
263,143✔
756
  SOperatorNode*       op1 = NULL;
263,143✔
757
  SLogicConditionNode* cond = NULL;
263,143✔
758

759
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
263,143!
760
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
263,143✔
761
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
263,143✔
762
  pCol->node.resType.bytes = LONG_BYTES;
263,143✔
763
  pCol->slotId = pTargetNodeTs->slotId;
263,143✔
764
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
263,143✔
765

766
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
263,143!
767

768
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
263,143!
769
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
263,143✔
770
  pVal->node.resType.bytes = LONG_BYTES;
263,143✔
771
  pVal->datum.i = start;
263,143✔
772
  pVal->typeData = start;
263,143✔
773

774
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
263,143!
775
  pVal1->datum.i = end;
263,143✔
776
  pVal1->typeData = end;
263,143✔
777

778
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
263,143!
779
  op->opType = OP_TYPE_GREATER_EQUAL;
263,143✔
780
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,143✔
781
  op->node.resType.bytes = CHAR_BYTES;
263,143✔
782
  op->pLeft = (SNode*)pCol;
263,143✔
783
  op->pRight = (SNode*)pVal;
263,143✔
784
  pCol = NULL;
263,143✔
785
  pVal = NULL;
263,143✔
786

787
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
263,143!
788
  op1->opType = OP_TYPE_LOWER_EQUAL;
263,143✔
789
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,143✔
790
  op1->node.resType.bytes = CHAR_BYTES;
263,143✔
791
  op1->pLeft = (SNode*)pCol1;
263,143✔
792
  op1->pRight = (SNode*)pVal1;
263,143✔
793
  pCol1 = NULL;
263,143✔
794
  pVal1 = NULL;
263,143✔
795

796
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
263,143!
797
  cond->condType = LOGIC_COND_TYPE_AND;
263,143✔
798
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,143✔
799
  cond->node.resType.bytes = CHAR_BYTES;
263,143✔
800
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
263,143!
801
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
263,143!
802
  op = NULL;
263,143✔
803
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
263,143!
804
  op1 = NULL;
263,143✔
805

806
  *pCond = cond;
263,143✔
807

808
end:
263,143✔
809
  if (code != 0) {
263,143!
810
    nodesDestroyNode((SNode*)pCol);
×
811
    nodesDestroyNode((SNode*)pCol1);
×
812
    nodesDestroyNode((SNode*)pVal);
×
813
    nodesDestroyNode((SNode*)pVal1);
×
814
    nodesDestroyNode((SNode*)op);
×
815
    nodesDestroyNode((SNode*)op1);
×
816
    nodesDestroyNode((SNode*)cond);
×
817
  }
818
  STREAM_PRINT_LOG_END(code, lino);
263,143!
819

820
  return code;
263,143✔
821
}
822

823
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
192✔
824
  int32_t              code = 0;
192✔
825
  int32_t              lino = 0;
192✔
826
  SLogicConditionNode* pAndCondition = NULL;
192✔
827
  SLogicConditionNode* cond = NULL;
192✔
828

829
  if (pTargetNodeTs == NULL) {
192!
830
    vError("stream reader %s no ts column", __func__);
×
831
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
×
832
  }
833
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
192!
834
  cond->condType = LOGIC_COND_TYPE_OR;
192✔
835
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
192✔
836
  cond->node.resType.bytes = CHAR_BYTES;
192✔
837
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
192!
838

839
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
263,335✔
840
    data->curIdx = i;
263,143✔
841

842
    SReadHandle handle = {0};
263,143✔
843
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
263,143✔
844
    if (!handle.winRangeValid) {
263,143!
845
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
×
846
              handle.winRange.ekey);
847
      continue;
×
848
    }
849
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
263,143!
850
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
263,143✔
851
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
263,143!
852
    pAndCondition = NULL;
263,143✔
853
  }
854

855
  *pCond = cond;
192✔
856

857
end:
192✔
858
  if (code != 0) {
192!
859
    nodesDestroyNode((SNode*)pAndCondition);
×
860
    nodesDestroyNode((SNode*)cond);
×
861
  }
862
  STREAM_PRINT_LOG_END(code, lino);
192!
863

864
  return code;
192✔
865
}
866

867
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
10,852✔
868
                                    STimeRangeNode* node, SReadHandle* handle) {
869
  int32_t code = 0;
10,852✔
870
  int32_t lino = 0;
10,852✔
871
  SArray* funcVals = NULL;
10,852✔
872
  if (req->pStRtFuncInfo->withExternalWindow) {
10,852✔
873
    nodesDestroyNode(sStreamReaderCalcInfo->tsConditions);
192✔
874
    filterFreeInfo(sStreamReaderCalcInfo->pFilterInfo);
192✔
875
    sStreamReaderCalcInfo->pFilterInfo = NULL;
192✔
876

877
    STREAM_CHECK_RET_GOTO(createExternalConditions(req->pStRtFuncInfo,
192!
878
                                                   (SLogicConditionNode**)&sStreamReaderCalcInfo->tsConditions,
879
                                                   sStreamReaderCalcInfo->pTargetNodeTs, node));
880

881
    STREAM_CHECK_RET_GOTO(filterInitFromNode((SNode*)sStreamReaderCalcInfo->tsConditions,
192!
882
                                             (SFilterInfo**)&sStreamReaderCalcInfo->pFilterInfo,
883
                                             FLT_OPTION_NO_REWRITE | FLT_OPTION_SCALAR_MODE, NULL));
884
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
192✔
885
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
192✔
886
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
192!
887
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
192!
888

889
    handle->winRange.skey = pFirst->wstart;
192✔
890
    handle->winRange.ekey = pLast->wend;
192✔
891
    handle->winRangeValid = true;
192✔
892
    stDebug("%s withExternalWindow is true, skey:%" PRId64 ", ekey:%" PRId64, __func__, pFirst->wstart, pLast->wend);
192✔
893
  } else {
894
    calcTimeRange(node, req->pStRtFuncInfo, &handle->winRange, &handle->winRangeValid);
10,660✔
895
  }
896

897
end:
10,852✔
898
  taosArrayDestroy(funcVals);
10,852✔
899
  return code;
10,852✔
900
}
901

902
static int32_t createBlockForWalMeta(SSDataBlock** pBlock, bool isVTable) {
8,029✔
903
  int32_t code = 0;
8,029✔
904
  int32_t lino = 0;
8,029✔
905
  SArray* schemas = NULL;
8,029✔
906

907
  schemas = taosArrayInit(8, sizeof(SSchema));
8,029✔
908
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
8,054!
909

910
  int32_t index = 0;
8,054✔
911
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
8,054!
912
  if (!isVTable) {
8,053✔
913
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid
6,953!
914
  }
915
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
8,052!
916
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
8,054!
917
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
8,051!
918
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
8,049!
919
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // nrows
8,054!
920

921
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
8,050!
922

923
end:
8,061✔
924
  taosArrayDestroy(schemas);
8,061✔
925
  return code;
8,057✔
926
}
927

928
static int32_t createOptionsForLastTs(SStreamTriggerReaderTaskInnerOptions* options,
342✔
929
                                      SStreamTriggerReaderInfo*             sStreamReaderInfo) {
930
  int32_t code = 0;
342✔
931
  int32_t lino = 0;
342✔
932
  SArray* schemas = NULL;
342✔
933

934
  schemas = taosArrayInit(4, sizeof(SSchema));
342✔
935
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
343!
936
  STREAM_CHECK_RET_GOTO(
343!
937
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // last ts
938

939
  BUILD_OPTION(op, sStreamReaderInfo, -1, true, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, schemas, true,
343✔
940
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
941
  schemas = NULL;
343✔
942
  *options = op;
343✔
943

944
end:
343✔
945
  taosArrayDestroy(schemas);
343✔
946
  return code;
343✔
947
}
948

949
static int32_t createOptionsForFirstTs(SStreamTriggerReaderTaskInnerOptions* options,
365✔
950
                                       SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t ver) {
951
  int32_t code = 0;
365✔
952
  int32_t lino = 0;
365✔
953
  SArray* schemas = NULL;
365✔
954

955
  schemas = taosArrayInit(4, sizeof(SSchema));
365✔
956
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
365!
957
  STREAM_CHECK_RET_GOTO(
365!
958
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // first ts
959

960
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, TSDB_ORDER_ASC, start, INT64_MAX, schemas, true,
365✔
961
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
962
  schemas = NULL;
365✔
963

964
  *options = op;
365✔
965
end:
365✔
966
  taosArrayDestroy(schemas);
365✔
967
  return code;
365✔
968
}
969

970
static int32_t createOptionsForTsdbMeta(SStreamTriggerReaderTaskInnerOptions* options,
1,264✔
971
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t end,
972
                                        int64_t gid, int8_t order, int64_t ver, bool onlyTs) {
973
  int32_t code = 0;
1,264✔
974
  int32_t lino = 0;
1,264✔
975
  SArray* schemas = NULL;
1,264✔
976

977
  int32_t index = 1;
1,264✔
978
  schemas = taosArrayInit(8, sizeof(SSchema));
1,264✔
979
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,264!
980
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
1,264!
981
  if (!onlyTs){
1,264✔
982
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
628!
983
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
628!
984
    if (sStreamReaderInfo->uidList == NULL) {
628✔
985
      STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
153!
986
    }
987
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
628!
988
  }
989
  
990
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, start, end, schemas, true, (gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), gid,
1,264✔
991
               true, sStreamReaderInfo->uidList);
992
  schemas = NULL;
1,264✔
993
  *options = op;
1,264✔
994

995
end:
1,264✔
996
  taosArrayDestroy(schemas);
1,264✔
997
  return code;
1,264✔
998
}
999

1000
static int taosCompareInt64Asc(const void* elem1, const void* elem2) {
70✔
1001
  int64_t* node1 = (int64_t*)elem1;
70✔
1002
  int64_t* node2 = (int64_t*)elem2;
70✔
1003

1004
  if (*node1 < *node2) {
70!
1005
    return -1;
×
1006
  }
1007

1008
  return *node1 > *node2;
70✔
1009
}
1010

1011
static int32_t getTableList(SArray** pList, int32_t* pNum, int64_t* suid, int32_t index,
86✔
1012
                            SStreamTriggerReaderInfo* sStreamReaderInfo) {
1013
  int32_t code = 0;
86✔
1014
  int32_t lino = 0;
86✔
1015

1016
  int32_t* start = taosArrayGet(sStreamReaderInfo->uidListIndex, index);
86✔
1017
  STREAM_CHECK_NULL_GOTO(start, terrno);
86!
1018
  int32_t  iStart = *start;
86✔
1019
  int32_t* end = taosArrayGet(sStreamReaderInfo->uidListIndex, index + 1);
86✔
1020
  STREAM_CHECK_NULL_GOTO(end, terrno);
86!
1021
  int32_t iEnd = *end;
86✔
1022
  *pList = taosArrayInit(iEnd - iStart, sizeof(STableKeyInfo));
86✔
1023
  STREAM_CHECK_NULL_GOTO(*pList, terrno);
86!
1024
  for (int32_t i = iStart; i < iEnd; ++i) {
259✔
1025
    int64_t*       uid = taosArrayGet(sStreamReaderInfo->uidList, i);
173✔
1026
    STableKeyInfo* info = taosArrayReserve(*pList, 1);
173✔
1027
    STREAM_CHECK_NULL_GOTO(info, terrno);
173!
1028
    *suid = uid[0];
173✔
1029
    info->uid = uid[1];
173✔
1030
  }
1031
  *pNum = iEnd - iStart;
86✔
1032
end:
86✔
1033
  STREAM_PRINT_LOG_END(code, lino);
86!
1034
  return code;
86✔
1035
}
1036

1037
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
622✔
1038
                                  SStreamReaderTaskInner* pTask) {
1039
  int32_t code = 0;
622✔
1040
  int32_t lino = 0;
622✔
1041

1042
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTask->pTableList), sizeof(STsInfo));
622✔
1043
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
622!
1044
  while (true) {
660✔
1045
    bool hasNext = false;
1,282✔
1046
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
1,282!
1047
    if (hasNext) {
1,282✔
1048
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
650✔
1049
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
650✔
1050
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
650!
1051
      if (pTask->options.order == TSDB_ORDER_ASC) {
650✔
1052
        tsInfo->ts = pTask->pResBlock->info.window.skey;
545✔
1053
      } else {
1054
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
105✔
1055
      }
1056
      tsInfo->gId = qStreamGetGroupId(pTask->pTableList, pTask->pResBlock->info.id.uid);
650✔
1057
      stDebug("vgId:%d %s get last ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
650✔
1058
              tsInfo->gId, tsRsp->ver);
1059
    }
1060

1061
    pTask->currentGroupIndex++;
1,282✔
1062
    if (pTask->currentGroupIndex >= qStreamGetTableListGroupNum(pTask->pTableList)) {
1,282✔
1063
      break;
622✔
1064
    }
1065
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTask));
659!
1066
  }
1067

1068
end:
622✔
1069
  return code;
622✔
1070
}
1071

1072
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
86✔
1073
                               SStreamReaderTaskInner* pTask, int64_t ver) {
1074
  int32_t code = 0;
86✔
1075
  int32_t lino = 0;
86✔
1076
  int64_t suid = 0;
86✔
1077
  SArray* pList = NULL;
86✔
1078
  int32_t pNum = 0;
86✔
1079

1080
  tsRsp->tsInfo = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(STsInfo));
86✔
1081
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
86!
1082
  for (int32_t i = 0; i < taosArrayGetSize(sStreamReaderInfo->uidListIndex) - 1; ++i) {
172✔
1083
    STREAM_CHECK_RET_GOTO(getTableList(&pList, &pNum, &suid, i, sStreamReaderInfo));
86!
1084

1085
    cleanupQueryTableDataCond(&pTask->cond);
86✔
1086
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas,
86!
1087
                                                        pTask->options.isSchema, pTask->options.twindows, suid, ver));
1088
    STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderOpen(
86!
1089
        pVnode, &pTask->cond, taosArrayGet(pList, 0), pNum, pTask->pResBlock, (void**)&pTask->pReader, pTask->idStr, NULL));
1090
    taosArrayDestroy(pList);
86✔
1091
    pList = NULL;
86✔
1092
    while (true) {
91✔
1093
      bool hasNext = false;
177✔
1094
      STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
177!
1095
      if (!hasNext) {
177✔
1096
        break;
86✔
1097
      }
1098
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
91✔
1099
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
91✔
1100
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
91!
1101
      if (pTask->options.order == TSDB_ORDER_ASC) {
91✔
1102
        tsInfo->ts = pTask->pResBlock->info.window.skey;
71✔
1103
      } else {
1104
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
20✔
1105
      }
1106
      tsInfo->gId = pTask->pResBlock->info.id.uid;
91✔
1107
      stDebug("vgId:%d %s get vtable last ts:%" PRId64 ", uid:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__,
91✔
1108
              tsInfo->ts, tsInfo->gId, tsRsp->ver);
1109
    }
1110
    pTask->api.tsdReader.tsdReaderClose(pTask->pReader);
86✔
1111
    pTask->pReader = NULL;
86✔
1112
  }
1113

1114
end:
86✔
1115
  taosArrayDestroy(pList);
86✔
1116
  return code;
86✔
1117
}
1118

1119
static void reSetUid(SStreamTriggerReaderTaskInnerOptions* options, int64_t suid, int64_t uid) {
314✔
1120
  if (suid != 0) options->suid = suid;
314✔
1121
  options->uid = uid;
314✔
1122
  if (options->suid != 0) {
314✔
1123
    options->tableType = TD_CHILD_TABLE;
310✔
1124
  } else {
1125
    options->tableType = TD_NORMAL_TABLE;
4✔
1126
  }
1127
}
314✔
1128

1129
static int32_t createOptionsForTsdbData(SVnode* pVnode, SStreamTriggerReaderTaskInnerOptions* options,
268✔
1130
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid, SArray* cols,
1131
                                        int8_t order,int64_t skey, int64_t ekey, int64_t ver) {
1132
  int32_t code = 0;
268✔
1133
  int32_t lino = 0;
268✔
1134
  SArray* schemas = NULL;
268✔
1135

1136
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
268!
1137
  STREAM_CHECK_RET_GOTO(shrinkScheams(cols, schemas));
268!
1138
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, skey, ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
268✔
1139
  *options = op;
268✔
1140

1141
end:
268✔
1142
  return code;
268✔
1143
}
1144

1145
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
60✔
1146
  int32_t code = 0;
60✔
1147
  int32_t lino = 0;
60✔
1148
  void*   buf = NULL;
60✔
1149
  size_t  size = 0;
60✔
1150
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
60!
1151
  void* pTask = sStreamReaderInfo->pTask;
60✔
1152

1153
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
60✔
1154

1155
  TSWAP(sStreamReaderInfo->uidList, req->setTableReq.uids);
60✔
1156
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidList, TSDB_CODE_INVALID_PARA);
60!
1157

1158
  taosArraySort(sStreamReaderInfo->uidList, taosCompareInt64Asc);
60✔
1159
  if (sStreamReaderInfo->uidListIndex != NULL) {
60!
1160
    taosArrayClear(sStreamReaderInfo->uidListIndex);
×
1161
  } else {
1162
    sStreamReaderInfo->uidListIndex = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(int32_t));
60✔
1163
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidListIndex, TSDB_CODE_INVALID_PARA);
60!
1164
  }
1165

1166
  if (sStreamReaderInfo->uidHash != NULL) {
60!
1167
    taosHashClear(sStreamReaderInfo->uidHash);
×
1168
  } else {
1169
    sStreamReaderInfo->uidHash = taosHashInit(taosArrayGetSize(sStreamReaderInfo->uidList),
60✔
1170
                                              taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1171
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHash, TSDB_CODE_INVALID_PARA);
60!
1172
  }
1173

1174
  int64_t suid = 0;
60✔
1175
  int32_t cnt = taosArrayGetSize(sStreamReaderInfo->uidList);
60✔
1176
  for (int32_t i = 0; i < cnt; ++i) {
177✔
1177
    int64_t* data = taosArrayGet(sStreamReaderInfo->uidList, i);
117✔
1178
    STREAM_CHECK_NULL_GOTO(data, terrno);
117!
1179
    if (*data == 0 || *data != suid) {
117✔
1180
      STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &i), terrno);
120!
1181
    }
1182
    suid = *data;
117✔
1183
    stDebug("vgId:%d %s suid:%" PRId64 ",uid:%" PRId64 ", index:%d", TD_VID(pVnode), __func__, suid, data[1], i);
117✔
1184
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->uidHash, data + 1, LONG_BYTES, &i, sizeof(int32_t)));
117!
1185
  }
1186
  STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &cnt), terrno);
120!
1187

1188
end:
60✔
1189
  STREAM_PRINT_LOG_END_WITHID(code, lino);
60!
1190
  SRpcMsg rsp = {
60✔
1191
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1192
  tmsgSendRsp(&rsp);
60✔
1193
  return code;
60✔
1194
}
1195

1196
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
341✔
1197
  int32_t                 code = 0;
341✔
1198
  int32_t                 lino = 0;
341✔
1199
  SArray*                 schemas = NULL;
341✔
1200
  SStreamReaderTaskInner* pTaskInner = NULL;
341✔
1201
  SStreamTsResponse       lastTsRsp = {0};
341✔
1202
  void*                   buf = NULL;
341✔
1203
  size_t                  size = 0;
341✔
1204

1205
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
341!
1206
  void* pTask = sStreamReaderInfo->pTask;
341✔
1207

1208
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
341✔
1209

1210
  SStreamTriggerReaderTaskInnerOptions options = {0};
341✔
1211
  STREAM_CHECK_RET_GOTO(createOptionsForLastTs(&options, sStreamReaderInfo));
341!
1212
  SStorageAPI api = {0};
343✔
1213
  initStorageAPI(&api);
343✔
1214
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
343!
1215

1216
  lastTsRsp.ver = pVnode->state.applied;
343✔
1217
  if (sStreamReaderInfo->uidList != NULL) {
343✔
1218
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner, -1));
52!
1219
  } else {
1220
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
291!
1221
  }
1222
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
343✔
1223
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
343!
1224

1225
end:
343✔
1226
  STREAM_PRINT_LOG_END_WITHID(code, lino);
343!
1227
  SRpcMsg rsp = {
343✔
1228
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1229
  tmsgSendRsp(&rsp);
343✔
1230
  taosArrayDestroy(lastTsRsp.tsInfo);
343✔
1231
  releaseStreamTask(&pTaskInner);
343✔
1232
  return code;
343✔
1233
}
1234

1235
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
365✔
1236
  int32_t                 code = 0;
365✔
1237
  int32_t                 lino = 0;
365✔
1238
  SStreamReaderTaskInner* pTaskInner = NULL;
365✔
1239
  SStreamTsResponse       firstTsRsp = {0};
365✔
1240
  void*                   buf = NULL;
365✔
1241
  size_t                  size = 0;
365✔
1242

1243
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
365!
1244
  void* pTask = sStreamReaderInfo->pTask;
365✔
1245
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
365✔
1246
  SStreamTriggerReaderTaskInnerOptions options = {0};
365✔
1247
  STREAM_CHECK_RET_GOTO(createOptionsForFirstTs(&options, sStreamReaderInfo, req->firstTsReq.startTime, req->firstTsReq.ver));
365!
1248
  SStorageAPI api = {0};
365✔
1249
  initStorageAPI(&api);
365✔
1250
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
365!
1251
  
1252
  firstTsRsp.ver = pVnode->state.applied;
365✔
1253
  if (sStreamReaderInfo->uidList != NULL) {
365✔
1254
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.ver));
34!
1255
  } else {
1256
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
331!
1257
  }
1258

1259
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
365✔
1260
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
365!
1261

1262
end:
365✔
1263
  STREAM_PRINT_LOG_END_WITHID(code, lino);
365!
1264
  SRpcMsg rsp = {
365✔
1265
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1266
  tmsgSendRsp(&rsp);
365✔
1267
  taosArrayDestroy(firstTsRsp.tsInfo);
365✔
1268
  releaseStreamTask(&pTaskInner);
365✔
1269
  return code;
365✔
1270
}
1271

1272
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
636✔
1273
  int32_t code = 0;
636✔
1274
  int32_t lino = 0;
636✔
1275
  void*   buf = NULL;
636✔
1276
  size_t  size = 0;
636✔
1277
  SStreamTriggerReaderTaskInnerOptions options = {0};
636✔
1278

1279
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
636!
1280
  void* pTask = sStreamReaderInfo->pTask;
636✔
1281
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
636✔
1282

1283
  SStreamReaderTaskInner* pTaskInner = NULL;
636✔
1284
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
636✔
1285

1286
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
636!
1287
    SStreamTriggerReaderTaskInnerOptions optionsTs = {0};
636✔
1288

1289
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&optionsTs, sStreamReaderInfo, req->tsdbMetaReq.startTime,
644!
1290
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, true));
1291
    SStorageAPI api = {0};
636✔
1292
    initStorageAPI(&api);
636✔
1293
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &optionsTs, &pTaskInner, NULL, sStreamReaderInfo->groupIdMap, &api));
636✔
1294
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
628!
1295
    
1296
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&options, sStreamReaderInfo, req->tsdbMetaReq.startTime,
628!
1297
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, false));
1298
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(options.schemas, &pTaskInner->pResBlockDst));
628!
1299
  } else {
1300
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1301
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1302
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1303
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1304
  }
1305

1306
  blockDataCleanup(pTaskInner->pResBlockDst);
628✔
1307
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
628!
1308
  bool hasNext = true;
628✔
1309
  while (true) {
160✔
1310
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
788!
1311
    if (!hasNext) {
788✔
1312
      break;
628✔
1313
    }
1314
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
160✔
1315
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
160✔
1316

1317
    int32_t index = 0;
160✔
1318
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
160!
1319
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
160!
1320
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
160!
1321
    if (sStreamReaderInfo->uidList == NULL) {
160✔
1322
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
45!
1323
    }
1324
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
160!
1325

1326
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
160✔
1327
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1328
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1329
            pTaskInner->pResBlockDst->info.rows++;
160✔
1330
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
160!
1331
      break;
×
1332
    }
1333
  }
1334

1335
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
628✔
1336
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
628!
1337
  if (!hasNext) {
628!
1338
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
628!
1339
  }
1340

1341
end:
628✔
1342
  STREAM_PRINT_LOG_END_WITHID(code, lino);
636!
1343
  SRpcMsg rsp = {
636✔
1344
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1345
  tmsgSendRsp(&rsp);
636✔
1346
  taosArrayDestroy(options.schemas);
636✔
1347
  return code;
636✔
1348
}
1349

1350
static int32_t vnodeProcessStreamTsdbTsDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
46✔
1351
  int32_t                 code = 0;
46✔
1352
  int32_t                 lino = 0;
46✔
1353
  SStreamReaderTaskInner* pTaskInner = NULL;
46✔
1354
  void*                   buf = NULL;
46✔
1355
  size_t                  size = 0;
46✔
1356
  SSDataBlock*            pBlockRes = NULL;
46✔
1357

1358
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
46!
1359
  void* pTask = sStreamReaderInfo->pTask;
46✔
1360
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
46!
1361

1362
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
46✔
1363
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
1364
  reSetUid(&options, req->tsdbTsDataReq.suid, req->tsdbTsDataReq.uid);
46✔
1365
  SStorageAPI api = {0};
46✔
1366
  initStorageAPI(&api);
46✔
1367
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
46!
1368
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
46!
1369
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
46!
1370

1371
  while (1) {
46✔
1372
    bool hasNext = false;
92✔
1373
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
92!
1374
    if (!hasNext) {
92✔
1375
      break;
46✔
1376
    }
1377
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
46✔
1378

1379
    SSDataBlock* pBlock = NULL;
46✔
1380
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
46!
1381
    if (pBlock != NULL && pBlock->info.rows > 0) {
46!
1382
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &api, pBlock));
46!
1383
    }
1384
    
1385
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
46!
1386
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
46!
1387
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
46!
1388
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1389
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1390
  }
1391

1392
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
46✔
1393

1394
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
46!
1395
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
46!
1396

1397
end:
46✔
1398
  STREAM_PRINT_LOG_END_WITHID(code, lino);
46!
1399
  SRpcMsg rsp = {
46✔
1400
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1401
  tmsgSendRsp(&rsp);
46✔
1402
  blockDataDestroy(pBlockRes);
46✔
1403

1404
  releaseStreamTask(&pTaskInner);
46✔
1405
  return code;
46✔
1406
}
1407

1408
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
339✔
1409
  int32_t code = 0;
339✔
1410
  int32_t lino = 0;
339✔
1411
  void*   buf = NULL;
339✔
1412
  size_t  size = 0;
339✔
1413

1414
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
339!
1415
  SStreamReaderTaskInner* pTaskInner = NULL;
339✔
1416
  void* pTask = sStreamReaderInfo->pTask;
339✔
1417
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
339✔
1418
  
1419
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
339✔
1420

1421
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
339✔
1422
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, true, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
163✔
1423
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
1424
                 req->tsdbTriggerDataReq.gid, true, NULL);
1425
    SStorageAPI api = {0};
163✔
1426
    initStorageAPI(&api);
163✔
1427
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock,
163!
1428
                                           sStreamReaderInfo->groupIdMap, &api));
1429
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
164!
1430
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
164!
1431
  } else {
1432
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
176✔
1433
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
176!
1434
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
176✔
1435
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
176!
1436
  }
1437

1438
  blockDataCleanup(pTaskInner->pResBlockDst);
340✔
1439
  bool hasNext = true;
340✔
1440
  while (1) {
×
1441
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
340!
1442
    if (!hasNext) {
340✔
1443
      break;
164✔
1444
    }
1445
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
176✔
1446
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
176✔
1447

1448
    SSDataBlock* pBlock = NULL;
176✔
1449
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
176!
1450
    if (pBlock != NULL && pBlock->info.rows > 0) {
176!
1451
      STREAM_CHECK_RET_GOTO(
176!
1452
        processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &pTaskInner->api, pBlock));
1453
    }
1454
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
176!
1455
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
176!
1456
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
176✔
1457
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1458
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1459
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
176!
1460
      break;
176✔
1461
    }
1462
  }
1463

1464
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
340!
1465
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
340✔
1466
  if (!hasNext) {
340✔
1467
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
164!
1468
  }
1469

1470
end:
340✔
1471
  STREAM_PRINT_LOG_END_WITHID(code, lino);
340!
1472
  SRpcMsg rsp = {
340✔
1473
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1474
  tmsgSendRsp(&rsp);
340✔
1475

1476
  return code;
340✔
1477
}
1478

1479
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
28,165✔
1480
  int32_t code = 0;
28,165✔
1481
  int32_t lino = 0;
28,165✔
1482
  void*   buf = NULL;
28,165✔
1483
  size_t  size = 0;
28,165✔
1484
  SSDataBlock*            pBlockRes = NULL;
28,165✔
1485

1486
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
28,165!
1487
  void* pTask = sStreamReaderInfo->pTask;
28,165✔
1488
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, 
28,165✔
1489
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid);
1490

1491
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
28,163!
1492

1493
  SStreamReaderTaskInner* pTaskInner = NULL;
28,163✔
1494
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
28,163✔
1495

1496
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
28,164!
1497
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
28,164✔
1498
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
1499
    SStorageAPI api = {0};
28,164✔
1500
    initStorageAPI(&api);
28,164✔
1501
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
28,163✔
1502

1503
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
28,066!
1504
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
28,066!
1505
  } else {
1506
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1507
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1508
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1509
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1510
  }
1511

1512
  blockDataCleanup(pTaskInner->pResBlockDst);
28,066✔
1513
  bool hasNext = true;
28,066✔
1514
  while (1) {
2,369✔
1515
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
30,435!
1516
    if (!hasNext) {
30,432✔
1517
      break;
28,062✔
1518
    }
1519
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
2,370✔
1520

1521
    SSDataBlock* pBlock = NULL;
2,372✔
1522
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
2,372!
1523
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
2,372!
1524
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
2,372!
1525
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
2,369!
1526
      break;
×
1527
    }
1528
  }
1529

1530
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
28,062!
1531
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
28,061!
1532
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
28,061✔
1533
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
28,065!
1534
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
28,063✔
1535
  if (!hasNext) {
28,065!
1536
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
28,065!
1537
  }
1538

1539
end:
28,064✔
1540
  STREAM_PRINT_LOG_END_WITHID(code, lino);
28,163!
1541
  SRpcMsg rsp = {
28,162✔
1542
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1543
  tmsgSendRsp(&rsp);
28,162✔
1544
  blockDataDestroy(pBlockRes);
28,164✔
1545
  return code;
28,164✔
1546
}
1547

1548
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
268✔
1549
  int32_t code = 0;
268✔
1550
  int32_t lino = 0;
268✔
1551
  void*   buf = NULL;
268✔
1552
  size_t  size = 0;
268✔
1553

1554
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
268!
1555
  void* pTask = sStreamReaderInfo->pTask;
268✔
1556
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
268✔
1557

1558
  SStreamReaderTaskInner* pTaskInner = NULL;
268✔
1559
  int64_t key = req->tsdbDataReq.uid;
268✔
1560

1561
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
268!
1562
    SStreamTriggerReaderTaskInnerOptions options = {0};
268✔
1563

1564
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbData(pVnode, &options, sStreamReaderInfo, req->tsdbDataReq.uid,
274!
1565
                                                   req->tsdbDataReq.cids, req->tsdbDataReq.order, req->tsdbDataReq.skey,
1566
                                                   req->tsdbDataReq.ekey, req->tsdbDataReq.ver));
1567
    reSetUid(&options, req->tsdbDataReq.suid, req->tsdbDataReq.uid);
268✔
1568

1569
    SStorageAPI api = {0};
268✔
1570
    initStorageAPI(&api);
268✔
1571
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
268!
1572

1573
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
268✔
1574
    cleanupQueryTableDataCond(&pTaskInner->cond);
268✔
1575
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
268!
1576
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
1577
                                                        pTaskInner->options.suid, pTaskInner->options.ver));
1578
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
268✔
1579
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
1580

1581
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
262!
1582
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
262!
1583
  } else {
1584
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1585
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1586
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1587
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1588
  }
1589

1590
  blockDataCleanup(pTaskInner->pResBlockDst);
262✔
1591
  bool hasNext = true;
262✔
1592
  while (1) {
262✔
1593
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
524!
1594
    if (!hasNext) {
524✔
1595
      break;
262✔
1596
    }
1597

1598
    SSDataBlock* pBlock = NULL;
262✔
1599
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
262!
1600
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
262!
1601
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
262!
1602
      break;
×
1603
    }
1604
  }
1605
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
262!
1606
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
262✔
1607
  if (!hasNext) {
262!
1608
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
262!
1609
  }
1610

1611
end:
262✔
1612
  STREAM_PRINT_LOG_END_WITHID(code, lino);
268!
1613
  SRpcMsg rsp = {
268✔
1614
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1615
  tmsgSendRsp(&rsp);
268✔
1616

1617
  return code;
268✔
1618
}
1619

1620
static int32_t vnodeProcessStreamWalMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
8,057✔
1621
  int32_t      code = 0;
8,057✔
1622
  int32_t      lino = 0;
8,057✔
1623
  void*        buf = NULL;
8,057✔
1624
  size_t       size = 0;
8,057✔
1625
  SSDataBlock* pBlock = NULL;
8,057✔
1626
  void*        pTableList = NULL;
8,057✔
1627
  SNodeList*   groupNew = NULL;
8,057✔
1628
  int64_t      lastVer = 0;
8,057✔
1629

1630
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
8,057✔
1631
  void* pTask = sStreamReaderInfo->pTask;
8,035✔
1632
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64 ",ctime:%" PRId64, TD_VID(pVnode), __func__,
8,035✔
1633
  req->walMetaReq.lastVer, req->walMetaReq.ctime);
1634

1635
  bool isVTable = sStreamReaderInfo->uidList != NULL;
8,035✔
1636
  if (sStreamReaderInfo->uidList == NULL) {
8,035✔
1637
    SStorageAPI api = {0};
6,945✔
1638
    initStorageAPI(&api);
6,945✔
1639
    STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
6,945!
1640
    STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
6,949!
1641
        pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1642
        groupNew, false, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1643
        &pTableList, sStreamReaderInfo->groupIdMap));
1644
  }
1645

1646
  STREAM_CHECK_RET_GOTO(createBlockForWalMeta(&pBlock, isVTable));
8,020!
1647
  STREAM_CHECK_RET_GOTO(scanWal(pVnode, isVTable ? sStreamReaderInfo->uidHash : pTableList, isVTable, pBlock,
8,052!
1648
                                req->walMetaReq.lastVer, sStreamReaderInfo->deleteReCalc,
1649
                                sStreamReaderInfo->deleteOutTbl, req->walMetaReq.ctime, &lastVer));
1650

1651
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
8,063✔
1652
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
8,063!
1653
  printDataBlock(pBlock, __func__, "");
8,053✔
1654

1655
end:
8,073✔
1656
  if (pBlock != NULL && pBlock->info.rows == 0) {
8,073✔
1657
    code = TSDB_CODE_STREAM_NO_DATA;
7,663✔
1658
    buf = rpcMallocCont(sizeof(int64_t));
7,663✔
1659
    *(int64_t *)buf = lastVer;
7,662✔
1660
    size = sizeof(int64_t);
7,662✔
1661
  }
1662
  SRpcMsg rsp = {
8,072✔
1663
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1664
  tmsgSendRsp(&rsp);
8,072✔
1665
  if (code == TSDB_CODE_STREAM_NO_DATA){
8,079✔
1666
    code = 0;
7,664✔
1667
  }
1668
  STREAM_PRINT_LOG_END_WITHID(code, lino);
8,079!
1669
  nodesDestroyList(groupNew);
8,079✔
1670
  blockDataDestroy(pBlock);
8,083✔
1671
  qStreamDestroyTableList(pTableList);
8,082✔
1672

1673
  return code;
8,083✔
1674
}
1675

1676
static int32_t vnodeProcessStreamWalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
7,706✔
1677
  int32_t      code = 0;
7,706✔
1678
  int32_t      lino = 0;
7,706✔
1679
  void*        buf = NULL;
7,706✔
1680
  size_t       size = 0;
7,706✔
1681
  SSDataBlock* pBlock = NULL;
7,706✔
1682

1683
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
7,706!
1684
  void* pTask = sStreamReaderInfo->pTask;
7,706✔
1685
  ST_TASK_DLOG("vgId:%d %s start, request type:%d skey:%" PRId64 ",ekey:%" PRId64 ",uid:%" PRId64 ",ver:%" PRId64, TD_VID(pVnode),
7,706✔
1686
  __func__, req->walDataReq.base.type, req->walDataReq.skey, req->walDataReq.ekey, req->walDataReq.uid, req->walDataReq.ver);
1687

1688
  STimeWindow window = {.skey = req->walDataReq.skey, .ekey = req->walDataReq.ekey};
7,706✔
1689
  if (req->walDataReq.base.type == STRIGGER_PULL_WAL_DATA){
7,706✔
1690
    STREAM_CHECK_RET_GOTO(processWalVerDataVTable(pVnode, req->walDataReq.cids, req->walDataReq.ver,
2,255!
1691
      req->walDataReq.uid, &window, &pBlock));
1692
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_CALC_DATA){
5,451✔
1693
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
2,855!
1694
      req->walDataReq.uid, &window, sStreamReaderInfo->calcResBlock, &pBlock));
1695
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TRIGGER_DATA) {
2,596✔
1696
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, true,
1,963!
1697
      req->walDataReq.uid, &window, sStreamReaderInfo->triggerResBlock, &pBlock));
1698
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TS_DATA){
633!
1699
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
634!
1700
      req->walDataReq.uid, &window, sStreamReaderInfo->tsBlock, &pBlock));
1701

1702
  }
1703
  
1704
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
7,705✔
1705
  printDataBlock(pBlock, __func__, "");
7,705✔
1706
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
7,705!
1707
end:
7,697✔
1708
  STREAM_PRINT_LOG_END_WITHID(code, lino);
7,697!
1709
  SRpcMsg rsp = {
7,697✔
1710
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1711
  tmsgSendRsp(&rsp);
7,697✔
1712

1713
  blockDataDestroy(pBlock);
7,701✔
1714
  return code;
7,709✔
1715
}
1716

1717
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
592✔
1718
  int32_t code = 0;
592✔
1719
  int32_t lino = 0;
592✔
1720
  void*   buf = NULL;
592✔
1721
  size_t  size = 0;
592✔
1722

1723
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
592!
1724
  void* pTask = sStreamReaderInfo->pTask;
592✔
1725
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
592✔
1726

1727
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
592✔
1728
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
592!
1729
  SStreamGroupInfo pGroupInfo = {0};
592✔
1730
  pGroupInfo.gInfo = *gInfo;
592✔
1731

1732
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
592✔
1733
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1734
  buf = rpcMallocCont(size);
592✔
1735
  STREAM_CHECK_NULL_GOTO(buf, terrno);
592!
1736
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
592✔
1737
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1738
end:
592✔
1739
  if (code != 0) {
592!
1740
    rpcFreeCont(buf);
×
1741
    buf = NULL;
×
1742
    size = 0;
×
1743
  }
1744
  STREAM_PRINT_LOG_END_WITHID(code, lino);
592!
1745
  SRpcMsg rsp = {
592✔
1746
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1747
  tmsgSendRsp(&rsp);
592✔
1748

1749
  return code;
592✔
1750
}
1751

1752
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
192✔
1753
  int32_t              code = 0;
192✔
1754
  int32_t              lino = 0;
192✔
1755
  void*                buf = NULL;
192✔
1756
  size_t               size = 0;
192✔
1757
  SStreamMsgVTableInfo vTableInfo = {0};
192✔
1758
  SMetaReader          metaReader = {0};
192✔
1759
  SNodeList*           groupNew = NULL;
192✔
1760
  void*                pTableList = NULL;
192✔
1761
  SStorageAPI api = {0};
192✔
1762
  initStorageAPI(&api);
192✔
1763

1764
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
192!
1765
  void* pTask = sStreamReaderInfo->pTask;
192✔
1766
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
192✔
1767

1768
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
192!
1769
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
192!
1770
      pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1771
      groupNew, true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1772
      &pTableList, sStreamReaderInfo->groupIdMap));
1773

1774
  SArray* cids = req->virTableInfoReq.cids;
192✔
1775
  STREAM_CHECK_NULL_GOTO(cids, terrno);
192!
1776

1777
  SArray* pTableListArray = qStreamGetTableArrayList(pTableList);
192✔
1778
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
192!
1779

1780
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
192✔
1781
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
192!
1782
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
192✔
1783

1784
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
520✔
1785
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
329✔
1786
    if (pKeyInfo == NULL) {
328!
1787
      continue;
×
1788
    }
1789
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
328✔
1790
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
328!
1791
    vTable->uid = pKeyInfo->uid;
328✔
1792
    vTable->gId = pKeyInfo->groupId;
328✔
1793

1794
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
328✔
1795
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
330!
1796
      vTable->cols.nCols = metaReader.me.colRef.nCols;
×
1797
      vTable->cols.version = metaReader.me.colRef.version;
×
1798
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
×
1799
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
×
1800
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
×
1801
      }
1802
    } else {
1803
      vTable->cols.nCols = taosArrayGetSize(cids);
329✔
1804
      vTable->cols.version = metaReader.me.colRef.version;
328✔
1805
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
328!
1806
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
1,493✔
1807
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
4,244✔
1808
          if (metaReader.me.colRef.pColRef[j].hasRef &&
3,859✔
1809
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
2,531✔
1810
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
779✔
1811
            break;
779✔
1812
          }
1813
        }
1814
      }
1815
    }
1816
    tDecoderClear(&metaReader.coder);
329✔
1817
  }
1818
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
191✔
1819
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
191!
1820

1821
end:
192✔
1822
  nodesDestroyList(groupNew);
192✔
1823
  qStreamDestroyTableList(pTableList);
192✔
1824
  tDestroySStreamMsgVTableInfo(&vTableInfo);
192✔
1825
  api.metaReaderFn.clearReader(&metaReader);
192✔
1826
  STREAM_PRINT_LOG_END_WITHID(code, lino);
192!
1827
  SRpcMsg rsp = {
192✔
1828
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1829
  tmsgSendRsp(&rsp);
192✔
1830
  return code;
192✔
1831
}
1832

1833
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
61✔
1834
  int32_t                   code = 0;
61✔
1835
  int32_t                   lino = 0;
61✔
1836
  void*                     buf = NULL;
61✔
1837
  size_t                    size = 0;
61✔
1838
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
61✔
1839
  SMetaReader               metaReader = {0};
61✔
1840
  int64_t streamId = req->base.streamId;
61✔
1841
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
61✔
1842

1843
  SStorageAPI api = {0};
61✔
1844
  initStorageAPI(&api);
61✔
1845

1846
  SArray* cols = req->origTableInfoReq.cols;
61✔
1847
  STREAM_CHECK_NULL_GOTO(cols, terrno);
61!
1848

1849
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
61✔
1850

1851
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
61!
1852

1853
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
61✔
1854
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
295✔
1855
    OTableInfo*    oInfo = taosArrayGet(cols, i);
235✔
1856
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
235✔
1857
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
235!
1858
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
235!
1859
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
235✔
1860
    vTableInfo->uid = metaReader.me.uid;
234✔
1861
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
234✔
1862

1863
    SSchemaWrapper* sSchemaWrapper = NULL;
234✔
1864
    if (metaReader.me.type == TD_CHILD_TABLE) {
234✔
1865
      int64_t suid = metaReader.me.ctbEntry.suid;
214✔
1866
      vTableInfo->suid = suid;
214✔
1867
      tDecoderClear(&metaReader.coder);
214✔
1868
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
214!
1869
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
213✔
1870
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
20!
1871
      vTableInfo->suid = 0;
20✔
1872
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
20✔
1873
    } else {
1874
      stError("invalid table type:%d", metaReader.me.type);
×
1875
    }
1876

1877
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
935✔
1878
      SSchema* s = sSchemaWrapper->pSchema + j;
934✔
1879
      if (strcmp(s->name, oInfo->refColName) == 0) {
934✔
1880
        vTableInfo->cid = s->colId;
232✔
1881
        break;
232✔
1882
      }
1883
    }
1884
    if (vTableInfo->cid == 0) {
233!
1885
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
1886
              oInfo->refTableName);
1887
    }
1888
    tDecoderClear(&metaReader.coder);
233✔
1889
  }
1890

1891
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
60!
1892

1893
end:
60✔
1894
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
61✔
1895
  api.metaReaderFn.clearReader(&metaReader);
61✔
1896
  STREAM_PRINT_LOG_END(code, lino);
61!
1897
  SRpcMsg rsp = {
61✔
1898
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1899
  tmsgSendRsp(&rsp);
61✔
1900
  return code;
61✔
1901
}
1902

1903
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
642✔
1904
  int32_t                   code = 0;
642✔
1905
  int32_t                   lino = 0;
642✔
1906
  void*                     buf = NULL;
642✔
1907
  size_t                    size = 0;
642✔
1908
  SSDataBlock* pBlock = NULL;
642✔
1909

1910
  SMetaReader               metaReader = {0};
642✔
1911
  SMetaReader               metaReaderStable = {0};
642✔
1912
  int64_t streamId = req->base.streamId;
642✔
1913
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
642✔
1914

1915
  SStorageAPI api = {0};
642✔
1916
  initStorageAPI(&api);
642✔
1917

1918
  SArray* cols = req->virTablePseudoColReq.cids;
642✔
1919
  STREAM_CHECK_NULL_GOTO(cols, terrno);
642!
1920

1921
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
642✔
1922
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
642!
1923

1924
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
642!
1925

1926
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
642!
1927
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
642✔
1928
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 && *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
17!
1929
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
17✔
1930
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
17!
1931
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
17!
1932
    pBlock->info.rows = 1;
17✔
1933
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
17✔
1934
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
17!
1935
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
17!
1936
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
625!
1937
    int64_t suid = metaReader.me.ctbEntry.suid;
625✔
1938
    api.metaReaderFn.readerReleaseLock(&metaReader);
625✔
1939
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
625✔
1940

1941
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
625!
1942
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
625✔
1943
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
2,173✔
1944
      col_id_t* id = taosArrayGet(cols, i);
1,548✔
1945
      STREAM_CHECK_NULL_GOTO(id, terrno);
1,548!
1946
      if (*id == -1) {
1,548✔
1947
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
625✔
1948
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
625!
1949
        continue;
625✔
1950
      }
1951
      size_t j = 0;
923✔
1952
      for (; j < sSchemaWrapper->nCols; j++) {
1,502!
1953
        SSchema* s = sSchemaWrapper->pSchema + j;
1,502✔
1954
        if (s->colId == *id) {
1,502✔
1955
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
923✔
1956
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
923!
1957
          break;
923✔
1958
        }
1959
      }
1960
      if (j == sSchemaWrapper->nCols) {
923!
1961
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
1962
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
1963
      }
1964
    }
1965
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
625!
1966
    pBlock->info.rows = 1;
625✔
1967
    
1968
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
2,173✔
1969
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,548✔
1970
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
1,548!
1971

1972
      if (pDst->info.colId == -1) {
1,548✔
1973
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
625!
1974
        continue;
625✔
1975
      }
1976
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
923!
1977
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
1978
        continue;
×
1979
      }
1980

1981
      STagVal val = {0};
923✔
1982
      val.cid = pDst->info.colId;
923✔
1983
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
923✔
1984

1985
      char* data = NULL;
923✔
1986
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
923!
1987
        data = tTagValToData((const STagVal*)p, false);
923✔
1988
      } else {
1989
        data = (char*)p;
×
1990
      }
1991

1992
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
923!
1993
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
1994

1995
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
923!
1996
          (data != NULL)) {
1997
        taosMemoryFree(data);
172!
1998
      }
1999
    }
2000
  } else {
2001
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
2002
    code = TSDB_CODE_INVALID_PARA;
×
2003
    goto end;
×
2004
  }
2005
  
2006
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
642✔
2007
  printDataBlock(pBlock, __func__, "");
642✔
2008
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
642!
2009

2010
end:
642✔
2011
  api.metaReaderFn.clearReader(&metaReaderStable);
642✔
2012
  api.metaReaderFn.clearReader(&metaReader);
642✔
2013
  STREAM_PRINT_LOG_END(code, lino);
642!
2014
  SRpcMsg rsp = {
642✔
2015
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2016
  tmsgSendRsp(&rsp);
642✔
2017
  blockDataDestroy(pBlock);
642✔
2018
  return code;
642✔
2019
}
2020

2021
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
10,995✔
2022
  int32_t            code = 0;
10,995✔
2023
  int32_t            lino = 0;
10,995✔
2024
  void*              buf = NULL;
10,995✔
2025
  size_t             size = 0;
10,995✔
2026
  void*              taskAddr = NULL;
10,995✔
2027
  SArray*            pResList = NULL;
10,995✔
2028

2029
  SResFetchReq req = {0};
10,995✔
2030
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
10,995!
2031
                              TSDB_CODE_QRY_INVALID_INPUT);
2032
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
10,995✔
2033
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
10,995!
2034

2035
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
10,995!
2036
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
10,995✔
2037
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
10,995!
2038
  void* pTask = sStreamReaderCalcInfo->pTask;
10,995✔
2039
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
10,995✔
2040
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
2041

2042
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
10,995!
2043
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
10,929✔
2044
    int64_t uid = 0;
10,929✔
2045
    if (req.dynTbname) {
10,929!
2046
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
×
2047
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2048
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
2049
        if (pValue != NULL && pValue->isTbname) {
×
2050
          uid = pValue->uid;
×
2051
          break;
×
2052
        }
2053
      }
2054
    }
2055
    
2056
    SReadHandle handle = {0};
10,929✔
2057
    handle.vnode = pVnode;
10,929✔
2058
    handle.uid = uid;
10,929✔
2059

2060
    initStorageAPI(&handle.api);
10,929✔
2061
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
10,929✔
2062
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
10,795✔
2063
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
10,869✔
2064
      if (node != NULL) {
10,869✔
2065
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle));
10,855!
2066
      }
2067
    }
2068

2069
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
10,929✔
2070
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
10,929✔
2071

2072
    // if (sStreamReaderCalcInfo->pTaskInfo == NULL) {
2073
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
10,929✔
2074
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
2075
                                                    req.taskId));
2076
    // } else {
2077
    // STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, handle.winRange));
2078
    // }
2079

2080
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
10,926!
2081
  }
2082

2083
  if (req.pOpParam != NULL) {
10,992✔
2084
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
48✔
2085
  }
2086
  
2087
  pResList = taosArrayInit(4, POINTER_BYTES);
10,992✔
2088
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
10,992!
2089
  uint64_t ts = 0;
10,992✔
2090
  bool     hasNext = false;
10,992✔
2091
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
10,992✔
2092

2093
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
18,027✔
2094
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
7,043✔
2095
    if (pBlock == NULL) continue;
7,043!
2096
    printDataBlock(pBlock, __func__, "fetch");
7,043✔
2097
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
7,043✔
2098
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo));
350!
2099
      printDataBlock(pBlock, __func__, "fetch filter");
350✔
2100
    }
2101
  }
2102

2103
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
10,984!
2104
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
10,984✔
2105

2106
end:
124✔
2107
  taosArrayDestroy(pResList);
10,995✔
2108
  streamReleaseTask(taskAddr);
10,995✔
2109

2110
  STREAM_PRINT_LOG_END(code, lino);
10,995!
2111
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
10,995✔
2112
  tmsgSendRsp(&rsp);
10,995✔
2113
  tDestroySResFetchReq(&req);
10,995✔
2114
  return code;
10,995✔
2115
}
2116

2117
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
58,632✔
2118
  int32_t                   code = 0;
58,632✔
2119
  int32_t                   lino = 0;
58,632✔
2120
  SSTriggerPullRequestUnion req = {0};
58,632✔
2121
  void*                     taskAddr = NULL;
58,632✔
2122

2123
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
58,632✔
2124
  if (!syncIsReadyForRead(pVnode->sync)) {
58,634✔
2125
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
158✔
2126
    return 0;
157✔
2127
  }
2128

2129
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
58,497✔
2130
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
10,995✔
2131
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
47,502!
2132
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
47,504✔
2133
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
47,504✔
2134
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
47,504!
2135
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
47,482✔
2136
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
2137
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
47,488✔
2138
    switch (req.base.type) {
47,480!
2139
      case STRIGGER_PULL_SET_TABLE:
60✔
2140
        code = vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo);
60✔
2141
        break;
60✔
2142
      case STRIGGER_PULL_LAST_TS:
343✔
2143
        code = vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
343✔
2144
        break;
343✔
2145
      case STRIGGER_PULL_FIRST_TS:
365✔
2146
        code = vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
365✔
2147
        break;
365✔
2148
      case STRIGGER_PULL_TSDB_META:
636✔
2149
      case STRIGGER_PULL_TSDB_META_NEXT:
2150
        code = vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
636✔
2151
        break;
636✔
2152
      case STRIGGER_PULL_TSDB_TS_DATA:
46✔
2153
        code = vnodeProcessStreamTsdbTsDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
46✔
2154
        break;
46✔
2155
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
340✔
2156
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
2157
        code = vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
340✔
2158
        break;
340✔
2159
      case STRIGGER_PULL_TSDB_CALC_DATA:
28,165✔
2160
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
2161
        code = vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
28,165✔
2162
        break;
28,162✔
2163
      case STRIGGER_PULL_TSDB_DATA:
268✔
2164
      case STRIGGER_PULL_TSDB_DATA_NEXT:
2165
        code = vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
268✔
2166
        break;
268✔
2167
      case STRIGGER_PULL_WAL_META:
8,064✔
2168
        code = vnodeProcessStreamWalMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
8,064✔
2169
        break;
8,078✔
2170
      case STRIGGER_PULL_WAL_TS_DATA:
7,706✔
2171
      case STRIGGER_PULL_WAL_TRIGGER_DATA:
2172
      case STRIGGER_PULL_WAL_CALC_DATA:
2173
      case STRIGGER_PULL_WAL_DATA:
2174
        code = vnodeProcessStreamWalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
7,706✔
2175
        break;
7,700✔
2176
      case STRIGGER_PULL_GROUP_COL_VALUE:
592✔
2177
        code = vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo);
592✔
2178
        break;
592✔
2179
      case STRIGGER_PULL_VTABLE_INFO:
192✔
2180
        code = vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo);
192✔
2181
        break;
192✔
2182
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
642✔
2183
        code = vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req);
642✔
2184
        break;
642✔
2185
      case STRIGGER_PULL_OTABLE_INFO:
61✔
2186
        code = vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req);
61✔
2187
        break;
61✔
2188
      default:
×
2189
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
2190
        code = TSDB_CODE_APP_ERROR;
×
2191
        break;
×
2192
    }
2193
  } else {
2194
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
2195
    code = TSDB_CODE_APP_ERROR;
×
2196
  }
2197
end:
47,485✔
2198

2199
  streamReleaseTask(taskAddr);
47,485✔
2200

2201
  tDestroySTriggerPullRequest(&req);
47,493✔
2202
  STREAM_PRINT_LOG_END(code, lino);
47,492!
2203
  return code;
47,485✔
2204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc