• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4729

12 Sep 2025 02:34AM UTC coverage: 58.085% (-1.0%) from 59.125%
#4729

push

travis-ci

web-flow
docs: optimize taosd config parameters doc better (#32964)

133518 of 292959 branches covered (45.58%)

Branch coverage included in aggregate %.

201933 of 284559 relevant lines covered (70.96%)

5466318.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.38
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "scalar.h"
17
#include "streamReader.h"
18
#include "tdatablock.h"
19
#include "tdb.h"
20
#include "tencode.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23
#include "vnd.h"
24
#include "vnode.h"
25
#include "vnodeInt.h"
26

27
#define BUILD_OPTION(options, sStreamInfo, _ver, _groupSort, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
28
                     _gid, _initReader, _uidList)                                                                        \
29
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_uidList == NULL ? sStreamInfo->suid : 0),              \
30
                                                  .uid = sStreamInfo->uid,                                         \
31
                                                  .ver = _ver,                                                     \
32
                                                  .tableType = sStreamInfo->tableType,                             \
33
                                                  .groupSort = _groupSort,                                         \
34
                                                  .order = _order,                                                 \
35
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
36
                                                  .pTagCond = sStreamInfo->pTagCond,                               \
37
                                                  .pTagIndexCond = sStreamInfo->pTagIndexCond,                     \
38
                                                  .pConditions = sStreamInfo->pConditions,                         \
39
                                                  .partitionCols = sStreamInfo->partitionCols,                     \
40
                                                  .schemas = _schemas,                                             \
41
                                                  .isSchema = _isSchema,                                           \
42
                                                  .scanMode = _scanMode,                                           \
43
                                                  .gid = _gid,                                                     \
44
                                                  .initReader = _initReader,                                       \
45
                                                  .uidList = _uidList};
46

47
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
29,721✔
48

49
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
50

51
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
95,846✔
52
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
95,846✔
53
  if (pSrc == NULL) {
95,808!
54
    return terrno;
×
55
  }
56

57
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
95,837✔
58
  return 0;
95,837✔
59
}
60

61
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
35,091✔
62
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
35,091✔
63
  if (code != TSDB_CODE_SUCCESS) {
35,086✔
64
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
6✔
65
  }
66

67
  return code;
35,086✔
68
}
69

70
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
3,385✔
71
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
3,385✔
72
}
73

74
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
140✔
75
  int32_t code = 0;
140✔
76
  int32_t lino = 0;
140✔
77
  void*   buf = NULL;
140✔
78
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
140✔
79
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
140!
80
  buf = rpcMallocCont(len);
140✔
81
  STREAM_CHECK_NULL_GOTO(buf, terrno);
140!
82
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
140✔
83
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
140!
84
  *data = buf;
140✔
85
  *size = len;
140✔
86
  buf = NULL;
140✔
87
end:
140✔
88
  rpcFreeCont(buf);
140✔
89
  return code;
140✔
90
}
91

92
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
556✔
93
  int32_t code = 0;
556✔
94
  int32_t lino = 0;
556✔
95
  void*   buf = NULL;
556✔
96
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
556✔
97
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
556!
98
  buf = rpcMallocCont(len);
556✔
99
  STREAM_CHECK_NULL_GOTO(buf, terrno);
556!
100
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
556✔
101
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
556!
102
  *data = buf;
556✔
103
  *size = len;
556✔
104
  buf = NULL;
556✔
105
end:
556✔
106
  rpcFreeCont(buf);
556✔
107
  return code;
556✔
108
}
109

110
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
775✔
111
  int32_t code = 0;
775✔
112
  int32_t lino = 0;
775✔
113
  void*   buf = NULL;
775✔
114
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
775✔
115
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
775!
116
  buf = rpcMallocCont(len);
775✔
117
  STREAM_CHECK_NULL_GOTO(buf, terrno);
774!
118
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
774✔
119
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
774!
120
  *data = buf;
774✔
121
  *size = len;
774✔
122
  buf = NULL;
774✔
123
end:
774✔
124
  rpcFreeCont(buf);
774✔
125
  return code;
775✔
126
}
127

128

129
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
58,459✔
130
  int32_t code = 0;
58,459✔
131
  int32_t lino = 0;
58,459✔
132
  void*   buf = NULL;
58,459✔
133
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
58,459!
134
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
22,310✔
135
  buf = rpcMallocCont(dataEncodeSize);
22,313✔
136
  STREAM_CHECK_NULL_GOTO(buf, terrno);
22,316!
137
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
22,316✔
138
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
22,315!
139
  *data = buf;
22,315✔
140
  *size = dataEncodeSize;
22,315✔
141
  buf = NULL;
22,315✔
142
end:
58,464✔
143
  rpcFreeCont(buf);
58,464✔
144
  return code;
58,461✔
145
}
146

147
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
591✔
148
  int32_t        pNum = 1;
591✔
149
  STableKeyInfo* pList = NULL;
591✔
150
  int32_t        code = 0;
591✔
151
  int32_t        lino = 0;
591✔
152
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
591!
153
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
591!
154

155
  cleanupQueryTableDataCond(&pTask->cond);
591✔
156
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
591!
157
                                                      pTask->options.twindows, pTask->options.suid, pTask->options.ver));
158
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
591!
159

160
end:
591✔
161
  STREAM_PRINT_LOG_END(code, lino);
591!
162
  return code;
591✔
163
}
164

165
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t gid, bool isVTable, int64_t uid,
15,447✔
166
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
167
  int32_t code = 0;
15,447✔
168
  int32_t lino = 0;
15,447✔
169
  int32_t index = 0;
15,447✔
170
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
15,447!
171
  if (!isVTable) {
15,431✔
172
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &gid));
3,706!
173
  }
174
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
15,399!
175
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
15,363!
176
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
15,337!
177
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
15,314!
178
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
15,312!
179

180
end:
15,319✔
181
  STREAM_PRINT_LOG_END(code, lino)
15,319!
182
  return code;
15,321✔
183
}
184

185
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
15,394✔
186
  pTSchema->numOfCols = 1;
15,394✔
187
  pTSchema->version = ver;
15,394✔
188
  pTSchema->columns[0].colId = colId;
15,394✔
189
  pTSchema->columns[0].type = type;
15,394✔
190
  pTSchema->columns[0].bytes = bytes;
15,394✔
191
}
15,394✔
192

193
int32_t retrieveWalMetaData(SSubmitTbData* pSubmitTbData, void* pTableList, bool isVTable, SSDataBlock* pBlock,
79,954✔
194
                            int64_t ver) {
195
  int32_t code = 0;
79,954✔
196
  int32_t lino = 0;
79,954✔
197

198
  int64_t   uid = pSubmitTbData->uid;
79,954✔
199
  int32_t   numOfRows = 0;
79,954✔
200
  int64_t   skey = 0;
79,954✔
201
  int64_t   ekey = 0;
79,954✔
202
  STSchema* pTSchema = NULL;
79,954✔
203
  uint64_t  gid = 0;
79,954✔
204
  if (isVTable) {
79,954✔
205
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS);
34,533✔
206
  } else {
207
    STREAM_CHECK_CONDITION_GOTO(!qStreamUidInTableList(pTableList, uid), TDB_CODE_SUCCESS);
45,421✔
208
    gid = qStreamGetGroupId(pTableList, uid);
3,665✔
209
  }
210

211
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
15,421✔
212
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
14✔
213
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
14!
214
    numOfRows = pCol->nVal;
14✔
215

216
    SColVal colVal = {0};
14✔
217
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, 0, &colVal));
14!
218
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
14✔
219

220
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, numOfRows - 1, &colVal));
14!
221
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
14✔
222
  } else {
223
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
15,407✔
224
    SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, 0);
15,390✔
225
    STREAM_CHECK_NULL_GOTO(pRow, terrno);
15,379!
226
    SColVal colVal = {0};
15,379✔
227
    pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn));
15,379!
228
    STREAM_CHECK_NULL_GOTO(pTSchema, terrno);
15,422!
229
    buildTSchema(pTSchema, pSubmitTbData->sver, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES);
15,422✔
230
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
15,387!
231
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
15,377✔
232
    pRow = taosArrayGetP(pSubmitTbData->aRowP, numOfRows - 1);
15,377✔
233
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
15,366!
234
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
15,360✔
235
  }
236

237
  STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_SUBMIT_DATA, gid, isVTable, uid, skey, ekey, ver, numOfRows));
15,374!
238
  pBlock->info.rows++;
15,251✔
239
  stDebug("stream reader scan submit data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64 ", gid %" PRIu64
15,251✔
240
          ", rows:%d",
241
          uid, skey, ekey, gid, numOfRows);
242

243
end:
3,148✔
244
  taosMemoryFree(pTSchema);
80,056!
245
  STREAM_PRINT_LOG_END(code, lino)
80,161!
246
  return code;
80,175✔
247
}
248

249
int32_t retrieveWalData(SVnode* pVnode, SSubmitTbData* pSubmitTbData, SSDataBlock* pBlock, STimeWindow* window) {
17,267✔
250
  stDebug("stream reader retrieve data block %p", pSubmitTbData);
17,267✔
251
  int32_t code = 0;
17,267✔
252
  int32_t lino = 0;
17,267✔
253

254
  int32_t ver = pSubmitTbData->sver;
17,267✔
255
  int64_t uid = pSubmitTbData->uid;
17,267✔
256
  int32_t numOfRows = 0;
17,267✔
257

258
  STSchema* schemas = metaGetTbTSchema(pVnode->pMeta, uid, ver, 1);
17,267✔
259
  STREAM_CHECK_NULL_GOTO(schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
17,280!
260
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
17,280✔
261
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
249✔
262
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
249!
263
    int32_t rowStart = 0;
249✔
264
    int32_t rowEnd = pCol->nVal;
249✔
265
    if (window != NULL) {
249!
266
      SColVal colVal = {0};
249✔
267
      rowStart = -1;
249✔
268
      rowEnd = -1;
249✔
269
      for (int32_t k = 0; k < pCol->nVal; k++) {
4,482✔
270
        STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
4,233!
271
        int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
4,233✔
272
        if (ts >= window->skey && rowStart == -1) {
4,233!
273
          rowStart = k;
249✔
274
        }
275
        if (ts > window->ekey && rowEnd == -1) {
4,233!
276
          rowEnd = k;
×
277
        }
278
      }
279
      STREAM_CHECK_CONDITION_GOTO(rowStart == -1 || rowStart == rowEnd, TDB_CODE_SUCCESS);
249!
280

281
      if (rowStart != -1 && rowEnd == -1) {
249!
282
        rowEnd = pCol->nVal;
249✔
283
      }
284
    }
285
    numOfRows = rowEnd - rowStart;
249✔
286
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
249!
287
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
1,066✔
288
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
817✔
289
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
817!
290
      if (i >= taosArrayGetSize(pSubmitTbData->aCol)) {
817!
291
        break;
×
292
      }
293
      for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aCol); j++) {
7,353✔
294
        SColData* pCol = taosArrayGet(pSubmitTbData->aCol, j);
6,536✔
295
        STREAM_CHECK_NULL_GOTO(pCol, terrno);
6,536!
296
        if (pCol->cid == pColData->info.colId) {
6,536✔
297
          for (int32_t k = rowStart; k < rowEnd; k++) {
14,706✔
298
            SColVal colVal = {0};
13,889✔
299
            STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
13,889!
300
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
13,889!
301
                                                !COL_VAL_IS_VALUE(&colVal)));
302
          }
303
        }
304
      }
305
    }
306
  } else {
307
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, taosArrayGetSize(pSubmitTbData->aRowP)));
17,031!
308
    for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aRowP); j++) {
542,596✔
309
      SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, j);
525,568✔
310
      STREAM_CHECK_NULL_GOTO(pRow, terrno);
525,578!
311
      SColVal colVal = {0};
525,578✔
312
      STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, 0, &colVal));
525,578!
313
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
525,577✔
314
      if (window != NULL && (ts < window->skey || ts > window->ekey)) {
525,577✔
315
        continue;
43✔
316
      }
317
      for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
1,604,699✔
318
        if (i >= schemas->numOfCols) {
1,593,909✔
319
          break;
514,719✔
320
        }
321
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
1,079,190✔
322
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
1,079,193!
323
        SColVal colVal = {0};
1,079,194✔
324
        int32_t sourceIdx = 0;
1,079,194✔
325
        while (1) {
326
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, sourceIdx, &colVal));
1,699,046!
327
          if (colVal.cid < pColData->info.colId) {
1,699,016✔
328
            sourceIdx++;
619,852✔
329
            continue;
619,852✔
330
          } else {
331
            break;
1,079,164✔
332
          }
333
        }
334
        if (colVal.cid == pColData->info.colId && COL_VAL_IS_VALUE(&colVal)) {
1,079,164!
335
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
1,062,654!
336
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, numOfRows, colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
18!
337
          } else {
338
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
1,062,636!
339
          }
340
        } else {
341
          colDataSetNULL(pColData, numOfRows);
16,510!
342
        }
343
      }
344
      numOfRows++;
525,526✔
345
    }
346
  }
347

348
  pBlock->info.rows = numOfRows;
17,284✔
349

350
end:
17,284✔
351
  taosMemoryFree(schemas);
17,284!
352
  STREAM_PRINT_LOG_END(code, lino)
17,285!
353
  return code;
17,284✔
354
}
355

356
static int32_t buildDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, SDeleteRes* req, int64_t uid, int64_t ver){
149✔
357
  int32_t    code = 0;
149✔
358
  int32_t    lino = 0;
149✔
359
  uint64_t   gid = 0;
149✔
360
  stDebug("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
149✔
361
  if (isVTable) {
149✔
362
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS)
45✔
363
  } else {
364
    gid = qStreamGetGroupId(pTableList, uid);
104✔
365
    STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
104✔
366
  }
367
  STREAM_CHECK_RET_GOTO(
74!
368
      buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, uid, req->skey, req->ekey, ver, 1));
369
  pBlock->info.rows++;
74✔
370

371
  stDebug("stream reader scan delete end data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
74✔
372
end:
48✔
373
  return code;
149✔
374
}
375

376
static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
149✔
377
                              int64_t ver) {
378
  int32_t    code = 0;
149✔
379
  int32_t    lino = 0;
149✔
380
  SDecoder   decoder = {0};
149✔
381
  SDeleteRes req = {0};
149✔
382
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
149✔
383
  tDecoderInit(&decoder, data, len);
149✔
384
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
149!
385
  
386
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
298✔
387
    uint64_t* uid = taosArrayGet(req.uidList, i);
149✔
388
    STREAM_CHECK_NULL_GOTO(uid, terrno);
149!
389
    STREAM_CHECK_RET_GOTO(buildDeleteData(pTableList, isVTable, pBlock, &req, *uid, ver));
149!
390
  }
391

392
end:
149✔
393
  taosArrayDestroy(req.uidList);
149✔
394
  tDecoderClear(&decoder);
149✔
395
  return code;
149✔
396
}
397

398
static int32_t scanDropTable(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
×
399
                             int64_t ver) {
400
  int32_t  code = 0;
×
401
  int32_t  lino = 0;
×
402
  SDecoder decoder = {0};
×
403

404
  SVDropTbBatchReq req = {0};
×
405
  tDecoderInit(&decoder, data, len);
×
406
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
×
407
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
408
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
409
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
×
410
    uint64_t gid = 0;
×
411
    if (isVTable) {
×
412
      if (taosHashGet(pTableList, &pDropTbReq->uid, sizeof(pDropTbReq->uid)) == NULL) {
×
413
        continue;
×
414
      }
415
    } else {
416
      gid = qStreamGetGroupId(pTableList, pDropTbReq->uid);
×
417
      if (gid == -1) continue;
×
418
    }
419

420
    STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_DELETE_TABLE, gid, isVTable, pDropTbReq->uid, 0, 0, ver, 1));
×
421
    pBlock->info.rows++;
×
422
    stDebug("stream reader scan drop :uid %" PRIu64 ", gid %" PRIu64, pDropTbReq->uid, gid);
×
423
  }
424

425
end:
×
426
  tDecoderClear(&decoder);
×
427
  return code;
×
428
}
429

430
static int32_t scanSubmitData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
80,132✔
431
                              int64_t ver) {
432
  int32_t  code = 0;
80,132✔
433
  int32_t  lino = 0;
80,132✔
434
  SDecoder decoder = {0};
80,132✔
435

436
  SSubmitReq2 submit = {0};
80,132✔
437
  tDecoderInit(&decoder, data, len);
80,132✔
438
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
80,082!
439

440
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
80,021✔
441

442
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfBlocks));
80,017!
443

444
  int32_t nextBlk = -1;
79,994✔
445
  while (++nextBlk < numOfBlocks) {
160,156✔
446
    stDebug("stream reader scan submit, next data block %d/%d", nextBlk, numOfBlocks);
79,996✔
447
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
79,996✔
448
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
79,963!
449
    STREAM_CHECK_RET_GOTO(retrieveWalMetaData(pSubmitTbData, pTableList, isVTable, pBlock, ver));
79,963!
450
  }
451
end:
80,160✔
452
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
80,160✔
453
  tDecoderClear(&decoder);
80,141✔
454
  return code;
80,219✔
455
}
456

457
static int32_t scanWal(SVnode* pVnode, void* pTableList, bool isVTable, SSDataBlock* pBlock, int64_t lastVer,
8,888✔
458
                       int8_t deleteData, int8_t deleteTb, int64_t ctime, int64_t* retVer) {
459
  int32_t code = 0;
8,888✔
460
  int32_t lino = 0;
8,888✔
461

462
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
8,888✔
463
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
8,889!
464
  *retVer = walGetAppliedVer(pWalReader->pWal);
8,889✔
465
  STREAM_CHECK_CONDITION_GOTO(walReaderSeekVer(pWalReader, lastVer + 1) != 0, TSDB_CODE_SUCCESS);
8,894✔
466

467
  while (1) {
81,270✔
468
    *retVer = walGetAppliedVer(pWalReader->pWal);
83,051✔
469
    STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader, true) < 0, TSDB_CODE_SUCCESS);
83,027✔
470

471
    SWalCont* wCont = &pWalReader->pHead->head;
81,240✔
472
    if (wCont->ingestTs / 1000 > ctime) break;
81,240!
473
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
81,240✔
474
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
81,240✔
475
    int64_t ver = wCont->version;
81,240✔
476

477
    stDebug("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
81,240✔
478
      TD_VID(pVnode), ver, wCont->msgType, deleteData, deleteTb);
479
    if (wCont->msgType == TDMT_VND_DELETE && deleteData != 0) {
81,225✔
480
      STREAM_CHECK_RET_GOTO(scanDeleteData(pTableList, isVTable, pBlock, data, len, ver));
149!
481
    } else if (wCont->msgType == TDMT_VND_DROP_TABLE && deleteTb != 0) {
81,076!
482
      STREAM_CHECK_RET_GOTO(scanDropTable(pTableList, isVTable, pBlock, data, len, ver));
×
483
    } else if (wCont->msgType == TDMT_VND_SUBMIT) {
81,076✔
484
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
80,158✔
485
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
80,158✔
486
      STREAM_CHECK_RET_GOTO(scanSubmitData(pTableList, isVTable, pBlock, data, len, ver));
80,158!
487
    }
488

489
    if (pBlock->info.rows >= STREAM_RETURN_ROWS_NUM) {
81,270!
490
      break;
×
491
    }
492
  }
493

494
end:
8,889✔
495
  walCloseReader(pWalReader);
8,889✔
496
  STREAM_PRINT_LOG_END(code, lino);
8,889!
497
  return code;
8,896✔
498
}
499

500
int32_t scanWalOneVer(SVnode* pVnode, SSDataBlock* pBlock, SSDataBlock* pBlockRet,
17,283✔
501
                      int64_t ver, int64_t uid, STimeWindow* window) {
502
  int32_t     code = 0;
17,283✔
503
  int32_t     lino = 0;
17,283✔
504
  SSubmitReq2 submit = {0};
17,283✔
505
  SDecoder    decoder = {0};
17,283✔
506

507
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
17,283✔
508
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
17,281!
509

510
  STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, ver));
17,281!
511
  STREAM_CHECK_CONDITION_GOTO(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT, TSDB_CODE_STREAM_WAL_VER_NOT_DATA);
17,276!
512
  STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
17,276!
513
  void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
17,282✔
514
  int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
17,282✔
515

516
  int32_t nextBlk = -1;
17,282✔
517
  tDecoderInit(&decoder, pBody, bodyLen);
17,282✔
518
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
17,278!
519

520
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
17,277✔
521
  while (++nextBlk < numOfBlocks) {
34,544✔
522
    stDebug("stream reader next data block %d/%d", nextBlk, numOfBlocks);
17,269✔
523
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
17,269✔
524
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
17,268!
525
    if (pSubmitTbData->uid != uid) {
17,268!
526
      stDebug("stream reader skip data block uid:%" PRId64, pSubmitTbData->uid);
×
527
      continue;
×
528
    }
529
    STREAM_CHECK_RET_GOTO(retrieveWalData(pVnode, pSubmitTbData, pBlock, window));
17,268!
530
    printDataBlock(pBlock, __func__, "");
17,281✔
531

532
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRet, pBlock));
17,274!
533
    blockDataCleanup(pBlock);
17,273✔
534
  }
535

536
end:
17,275✔
537
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
17,275✔
538
  walCloseReader(pWalReader);
17,280✔
539
  tDecoderClear(&decoder);
17,286✔
540
  STREAM_PRINT_LOG_END(code, lino);
17,286!
541
  return code;
17,286✔
542
}
543

544
static int32_t processTag(SVnode* pVnode, SExprInfo* pExpr, int32_t numOfExpr, SStorageAPI* api, SSDataBlock* pBlock) {
6,419✔
545
  int32_t     code = 0;
6,419✔
546
  int32_t     lino = 0;
6,419✔
547
  SMetaReader mr = {0};
6,419✔
548

549
  if (numOfExpr == 0) {
6,419!
550
    return TSDB_CODE_SUCCESS;
×
551
  }
552
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
6,419✔
553
  code = api->metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
6,420✔
554
  if (code != TSDB_CODE_SUCCESS) {
6,407!
555
    stError("failed to get table meta, uid:%" PRId64 ", code:%s", pBlock->info.id.uid, tstrerror(code));
×
556
  }
557
  api->metaReaderFn.readerReleaseLock(&mr);
6,407✔
558
  for (int32_t j = 0; j < numOfExpr; ++j) {
28,761✔
559
    const SExprInfo* pExpr1 = &pExpr[j];
22,344✔
560
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
22,344✔
561

562
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
22,344✔
563
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
22,341✔
564
    if (mr.me.name == NULL) {
22,338!
565
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
566
      continue;
×
567
    }
568
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
22,338✔
569

570
    int32_t functionId = pExpr1->pExpr->_function.functionId;
22,339✔
571

572
    // this is to handle the tbname
573
    if (fmIsScanPseudoColumnFunc(functionId)) {
22,339✔
574
      int32_t fType = pExpr1->pExpr->_function.functionType;
6,416✔
575
      if (fType == FUNCTION_TYPE_TBNAME) {
6,416!
576
        STREAM_CHECK_RET_GOTO(setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name));
6,416!
577
        pColInfoData->info.colId = -1;
6,409✔
578
      }
579
    } else {  // these are tags
580
      STagVal tagVal = {0};
15,927✔
581
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
15,927✔
582
      const char* p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
15,927✔
583

584
      char* data = NULL;
15,922✔
585
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
15,922!
586
        data = tTagValToData((const STagVal*)p, false);
15,919✔
587
      } else {
588
        data = (char*)p;
3✔
589
      }
590

591
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
15,924!
592
      if (isNullVal) {
15,924!
593
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
594
      } else {
595
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
15,924✔
596
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
15,930!
597
          taosMemoryFree(data);
8,888!
598
        }
599
        STREAM_CHECK_RET_GOTO(code);
15,930!
600
      }
601
    }
602
  }
603

604
end:
6,417✔
605
  api->metaReaderFn.clearReader(&mr);
6,417✔
606

607
  STREAM_PRINT_LOG_END(code, lino);
6,415!
608
  return code;
6,417✔
609
}
610

611
static int32_t processWalVerData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamInfo, int64_t ver, bool isTrigger,
6,206✔
612
                                 int64_t uid, STimeWindow* window, SSDataBlock* pSrcBlock, SSDataBlock** pBlock) {
613
  int32_t      code = 0;
6,206✔
614
  int32_t      lino = 0;
6,206✔
615
  SFilterInfo* pFilterInfo = NULL;
6,206✔
616
  SSDataBlock* pBlock1 = NULL;
6,206✔
617
  SSDataBlock* pBlock2 = NULL;
6,206✔
618

619
  SExprInfo*   pExpr = sStreamInfo->pExprInfo;
6,206✔
620
  int32_t      numOfExpr = sStreamInfo->numOfExpr;
6,206✔
621

622
  STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamInfo->pConditions, &pFilterInfo, 0, NULL));
6,206!
623

624
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock1));
6,210!
625
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock2));
6,211!
626
  if (!isTrigger) STREAM_CHECK_RET_GOTO(createOneDataBlock(pSrcBlock, false, pBlock));
6,210!
627

628
  pBlock2->info.id.uid = uid;
6,212✔
629
  pBlock1->info.id.uid = uid;
6,212✔
630

631
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
6,212!
632

633
  if (pBlock2->info.rows > 0) {
6,213!
634
    SStorageAPI  api = {0};
6,213✔
635
    initStorageAPI(&api);
6,213✔
636
    STREAM_CHECK_RET_GOTO(processTag(pVnode, pExpr, numOfExpr, &api, pBlock2));
6,211!
637
  }
638
  STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock2, pFilterInfo));
6,205!
639
  if (!isTrigger) {
6,211✔
640
    blockDataTransform(*pBlock, pBlock2);
3,875✔
641
  } else {
642
    *pBlock = pBlock2;
2,336✔
643
    pBlock2 = NULL;  
2,336✔
644
  }
645

646
  printDataBlock(*pBlock, __func__, "processWalVerData2");
6,211✔
647

648
end:
6,206✔
649
  STREAM_PRINT_LOG_END(code, lino);
6,206!
650
  filterFreeInfo(pFilterInfo);
6,206✔
651
  blockDataDestroy(pBlock1);
6,212✔
652
  blockDataDestroy(pBlock2);
6,209✔
653
  return code;
6,209✔
654
}
655

656
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
11,446✔
657
  int32_t code = 0;
11,446✔
658
  int32_t lino = 0;
11,446✔
659
  SMetaReader metaReader = {0};
11,446✔
660
  SStorageAPI api = {0};
11,446✔
661
  initStorageAPI(&api);
11,446✔
662
  *schemas = taosArrayInit(8, sizeof(SSchema));
11,448✔
663
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
11,448!
664
  
665
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
11,448✔
666
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
11,448!
667

668
  SSchemaWrapper* sSchemaWrapper = NULL;
11,446✔
669
  if (metaReader.me.type == TD_CHILD_TABLE) {
11,446✔
670
    int64_t suid = metaReader.me.ctbEntry.suid;
11,329✔
671
    tDecoderClear(&metaReader.coder);
11,329✔
672
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
11,331!
673
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
11,329✔
674
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
117!
675
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
117✔
676
  } else {
677
    qError("invalid table type:%d", metaReader.me.type);
×
678
  }
679

680
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
69,003✔
681
    SSchema* s = sSchemaWrapper->pSchema + j;
57,559✔
682
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
115,117!
683
  }
684

685
end:
11,444✔
686
  api.metaReaderFn.clearReader(&metaReader);
11,444✔
687
  STREAM_PRINT_LOG_END(code, lino);
11,446!
688
  if (code != 0)  taosArrayDestroy(*schemas);
11,447!
689
  return code;
11,446✔
690
}
691

692
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
11,447✔
693
  int32_t code = 0;
11,447✔
694
  int32_t lino = 0;
11,447✔
695
  size_t  schemaLen = taosArrayGetSize(schemas);
11,447✔
696
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
11,447!
697
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
58,336✔
698
    col_id_t* id = taosArrayGet(cols, i);
46,888✔
699
    STREAM_CHECK_NULL_GOTO(id, terrno);
46,888!
700
    for (size_t i = 0; i < schemaLen; i++) {
144,526✔
701
      SSchema* s = taosArrayGet(schemas, i);
144,525✔
702
      STREAM_CHECK_NULL_GOTO(s, terrno);
144,524!
703
      if (*id == s->colId) {
144,524✔
704
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
46,887!
705
        break;
46,887✔
706
      }
707
    }
708
  }
709
  taosArrayPopFrontBatch(schemas, schemaLen);
11,443✔
710

711
end:
11,447✔
712
  return code;
11,447✔
713
}
714

715
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
11,072✔
716
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
717
  int32_t      code = 0;
11,072✔
718
  int32_t      lino = 0;
11,072✔
719
  SArray*      schemas = NULL;
11,072✔
720

721
  SSDataBlock* pBlock1 = NULL;
11,072✔
722
  SSDataBlock* pBlock2 = NULL;
11,072✔
723

724
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
11,072!
725
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
11,072!
726
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock1));
11,072!
727
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
11,073!
728

729
  pBlock2->info.id.uid = uid;
11,072✔
730
  pBlock1->info.id.uid = uid;
11,072✔
731

732
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
11,072!
733
  printDataBlock(pBlock2, __func__, "");
11,073✔
734

735
  *pBlock = pBlock2;
11,073✔
736
  pBlock2 = NULL;
11,073✔
737

738
end:
11,073✔
739
  STREAM_PRINT_LOG_END(code, lino);
11,073!
740
  blockDataDestroy(pBlock1);
11,073✔
741
  blockDataDestroy(pBlock2);
11,071✔
742
  taosArrayDestroy(schemas);
11,070✔
743
  return code;
11,072✔
744
}
745

746
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
463✔
747
                                    STargetNode* pTargetNodeTs) {
748
  int32_t code = 0;
463✔
749
  int32_t lino = 0;
463✔
750

751
  SColumnNode*         pCol = NULL;
463✔
752
  SColumnNode*         pCol1 = NULL;
463✔
753
  SValueNode*          pVal = NULL;
463✔
754
  SValueNode*          pVal1 = NULL;
463✔
755
  SOperatorNode*       op = NULL;
463✔
756
  SOperatorNode*       op1 = NULL;
463✔
757
  SLogicConditionNode* cond = NULL;
463✔
758

759
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
463!
760
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
463✔
761
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
463✔
762
  pCol->node.resType.bytes = LONG_BYTES;
463✔
763
  pCol->slotId = pTargetNodeTs->slotId;
463✔
764
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
463✔
765

766
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
463!
767

768
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
463!
769
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
463✔
770
  pVal->node.resType.bytes = LONG_BYTES;
463✔
771
  pVal->datum.i = start;
463✔
772
  pVal->typeData = start;
463✔
773

774
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
463!
775
  pVal1->datum.i = end;
463✔
776
  pVal1->typeData = end;
463✔
777

778
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
463!
779
  op->opType = OP_TYPE_GREATER_EQUAL;
463✔
780
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
463✔
781
  op->node.resType.bytes = CHAR_BYTES;
463✔
782
  op->pLeft = (SNode*)pCol;
463✔
783
  op->pRight = (SNode*)pVal;
463✔
784
  pCol = NULL;
463✔
785
  pVal = NULL;
463✔
786

787
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
463!
788
  op1->opType = OP_TYPE_LOWER_EQUAL;
463✔
789
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
463✔
790
  op1->node.resType.bytes = CHAR_BYTES;
463✔
791
  op1->pLeft = (SNode*)pCol1;
463✔
792
  op1->pRight = (SNode*)pVal1;
463✔
793
  pCol1 = NULL;
463✔
794
  pVal1 = NULL;
463✔
795

796
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
463!
797
  cond->condType = LOGIC_COND_TYPE_AND;
463✔
798
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
463✔
799
  cond->node.resType.bytes = CHAR_BYTES;
463✔
800
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
463!
801
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
463!
802
  op = NULL;
463✔
803
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
463!
804
  op1 = NULL;
463✔
805

806
  *pCond = cond;
463✔
807

808
end:
463✔
809
  if (code != 0) {
463!
810
    nodesDestroyNode((SNode*)pCol);
×
811
    nodesDestroyNode((SNode*)pCol1);
×
812
    nodesDestroyNode((SNode*)pVal);
×
813
    nodesDestroyNode((SNode*)pVal1);
×
814
    nodesDestroyNode((SNode*)op);
×
815
    nodesDestroyNode((SNode*)op1);
×
816
    nodesDestroyNode((SNode*)cond);
×
817
  }
818
  STREAM_PRINT_LOG_END(code, lino);
463!
819

820
  return code;
463✔
821
}
822

823
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
134✔
824
  int32_t              code = 0;
134✔
825
  int32_t              lino = 0;
134✔
826
  SLogicConditionNode* pAndCondition = NULL;
134✔
827
  SLogicConditionNode* cond = NULL;
134✔
828

829
  if (pTargetNodeTs == NULL) {
134!
830
    vError("stream reader %s no ts column", __func__);
×
831
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
×
832
  }
833
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
134!
834
  cond->condType = LOGIC_COND_TYPE_OR;
134✔
835
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
134✔
836
  cond->node.resType.bytes = CHAR_BYTES;
134✔
837
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
134!
838

839
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
597✔
840
    data->curIdx = i;
463✔
841

842
    SReadHandle handle = {0};
463✔
843
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
463✔
844
    if (!handle.winRangeValid) {
463!
845
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
×
846
              handle.winRange.ekey);
847
      continue;
×
848
    }
849
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
463!
850
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
463✔
851
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
463!
852
    pAndCondition = NULL;
463✔
853
  }
854

855
  *pCond = cond;
134✔
856

857
end:
134✔
858
  if (code != 0) {
134!
859
    nodesDestroyNode((SNode*)pAndCondition);
×
860
    nodesDestroyNode((SNode*)cond);
×
861
  }
862
  STREAM_PRINT_LOG_END(code, lino);
134!
863

864
  return code;
134✔
865
}
866

867
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
16,217✔
868
                                    STimeRangeNode* node, SReadHandle* handle) {
869
  int32_t code = 0;
16,217✔
870
  int32_t lino = 0;
16,217✔
871
  SArray* funcVals = NULL;
16,217✔
872
  if (req->pStRtFuncInfo->withExternalWindow) {
16,217✔
873
    nodesDestroyNode(sStreamReaderCalcInfo->tsConditions);
134✔
874
    filterFreeInfo(sStreamReaderCalcInfo->pFilterInfo);
134✔
875
    sStreamReaderCalcInfo->pFilterInfo = NULL;
134✔
876

877
    STREAM_CHECK_RET_GOTO(createExternalConditions(req->pStRtFuncInfo,
134!
878
                                                   (SLogicConditionNode**)&sStreamReaderCalcInfo->tsConditions,
879
                                                   sStreamReaderCalcInfo->pTargetNodeTs, node));
880

881
    STREAM_CHECK_RET_GOTO(filterInitFromNode((SNode*)sStreamReaderCalcInfo->tsConditions,
134!
882
                                             (SFilterInfo**)&sStreamReaderCalcInfo->pFilterInfo,
883
                                             FLT_OPTION_NO_REWRITE | FLT_OPTION_SCALAR_MODE, NULL));
884
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
134✔
885
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
134✔
886
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
134!
887
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
134!
888

889
    handle->winRange.skey = pFirst->wstart;
134✔
890
    handle->winRange.ekey = pLast->wend;
134✔
891
    handle->winRangeValid = true;
134✔
892
    stDebug("%s withExternalWindow is true, skey:%" PRId64 ", ekey:%" PRId64, __func__, pFirst->wstart, pLast->wend);
134✔
893
  } else {
894
    calcTimeRange(node, req->pStRtFuncInfo, &handle->winRange, &handle->winRangeValid);
16,083✔
895
  }
896

897
end:
16,217✔
898
  taosArrayDestroy(funcVals);
16,217✔
899
  return code;
16,217✔
900
}
901

902
static int32_t createBlockForWalMeta(SSDataBlock** pBlock, bool isVTable) {
8,884✔
903
  int32_t code = 0;
8,884✔
904
  int32_t lino = 0;
8,884✔
905
  SArray* schemas = NULL;
8,884✔
906

907
  schemas = taosArrayInit(8, sizeof(SSchema));
8,884✔
908
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
8,889!
909

910
  int32_t index = 0;
8,889✔
911
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
8,889!
912
  if (!isVTable) {
8,890✔
913
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid
5,691!
914
  }
915
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
8,887!
916
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
8,879!
917
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
8,885!
918
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
8,885!
919
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // nrows
8,880!
920

921
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
8,884!
922

923
end:
8,900✔
924
  taosArrayDestroy(schemas);
8,900✔
925
  return code;
8,891✔
926
}
927

928
static int32_t createOptionsForLastTs(SStreamTriggerReaderTaskInnerOptions* options,
411✔
929
                                      SStreamTriggerReaderInfo*             sStreamReaderInfo) {
930
  int32_t code = 0;
411✔
931
  int32_t lino = 0;
411✔
932
  SArray* schemas = NULL;
411✔
933

934
  schemas = taosArrayInit(4, sizeof(SSchema));
411✔
935
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
414!
936
  STREAM_CHECK_RET_GOTO(
414!
937
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // last ts
938

939
  BUILD_OPTION(op, sStreamReaderInfo, -1, true, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, schemas, true,
414✔
940
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
941
  schemas = NULL;
414✔
942
  *options = op;
414✔
943

944
end:
414✔
945
  taosArrayDestroy(schemas);
414✔
946
  return code;
414✔
947
}
948

949
static int32_t createOptionsForFirstTs(SStreamTriggerReaderTaskInnerOptions* options,
361✔
950
                                       SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t ver) {
951
  int32_t code = 0;
361✔
952
  int32_t lino = 0;
361✔
953
  SArray* schemas = NULL;
361✔
954

955
  schemas = taosArrayInit(4, sizeof(SSchema));
361✔
956
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
361!
957
  STREAM_CHECK_RET_GOTO(
361!
958
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // first ts
959

960
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, TSDB_ORDER_ASC, start, INT64_MAX, schemas, true,
361✔
961
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
962
  schemas = NULL;
361✔
963

964
  *options = op;
361✔
965
end:
361✔
966
  taosArrayDestroy(schemas);
361✔
967
  return code;
361✔
968
}
969

970
static int32_t createOptionsForTsdbMeta(SStreamTriggerReaderTaskInnerOptions* options,
1,176✔
971
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t end,
972
                                        int64_t gid, int8_t order, int64_t ver, bool onlyTs) {
973
  int32_t code = 0;
1,176✔
974
  int32_t lino = 0;
1,176✔
975
  SArray* schemas = NULL;
1,176✔
976

977
  int32_t index = 1;
1,176✔
978
  schemas = taosArrayInit(8, sizeof(SSchema));
1,176✔
979
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,176!
980
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
1,176!
981
  if (!onlyTs){
1,176✔
982
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
587!
983
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
587!
984
    if (sStreamReaderInfo->uidList == NULL) {
587✔
985
      STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
98!
986
    }
987
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
587!
988
  }
989
  
990
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, start, end, schemas, true, (gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), gid,
1,176✔
991
               true, sStreamReaderInfo->uidList);
992
  schemas = NULL;
1,176✔
993
  *options = op;
1,176✔
994

995
end:
1,176✔
996
  taosArrayDestroy(schemas);
1,176✔
997
  return code;
1,176✔
998
}
999

1000
static int taosCompareInt64Asc(const void* elem1, const void* elem2) {
300✔
1001
  int64_t* node1 = (int64_t*)elem1;
300✔
1002
  int64_t* node2 = (int64_t*)elem2;
300✔
1003

1004
  if (*node1 < *node2) {
300!
1005
    return -1;
×
1006
  }
1007

1008
  return *node1 > *node2;
300✔
1009
}
1010

1011
static int32_t getTableList(SArray** pList, int32_t* pNum, int64_t* suid, int32_t index,
193✔
1012
                            SStreamTriggerReaderInfo* sStreamReaderInfo) {
1013
  int32_t code = 0;
193✔
1014
  int32_t lino = 0;
193✔
1015

1016
  int32_t* start = taosArrayGet(sStreamReaderInfo->uidListIndex, index);
193✔
1017
  STREAM_CHECK_NULL_GOTO(start, terrno);
193!
1018
  int32_t  iStart = *start;
193✔
1019
  int32_t* end = taosArrayGet(sStreamReaderInfo->uidListIndex, index + 1);
193✔
1020
  STREAM_CHECK_NULL_GOTO(end, terrno);
193!
1021
  int32_t iEnd = *end;
193✔
1022
  *pList = taosArrayInit(iEnd - iStart, sizeof(STableKeyInfo));
193✔
1023
  STREAM_CHECK_NULL_GOTO(*pList, terrno);
193!
1024
  for (int32_t i = iStart; i < iEnd; ++i) {
748✔
1025
    int64_t*       uid = taosArrayGet(sStreamReaderInfo->uidList, i);
555✔
1026
    STableKeyInfo* info = taosArrayReserve(*pList, 1);
555✔
1027
    STREAM_CHECK_NULL_GOTO(info, terrno);
555!
1028
    *suid = uid[0];
555✔
1029
    info->uid = uid[1];
555✔
1030
  }
1031
  *pNum = iEnd - iStart;
193✔
1032
end:
193✔
1033
  STREAM_PRINT_LOG_END(code, lino);
193!
1034
  return code;
193✔
1035
}
1036

1037
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
581✔
1038
                                  SStreamReaderTaskInner* pTask) {
1039
  int32_t code = 0;
581✔
1040
  int32_t lino = 0;
581✔
1041

1042
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTask->pTableList), sizeof(STsInfo));
581✔
1043
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
582!
1044
  while (true) {
591✔
1045
    bool hasNext = false;
1,173✔
1046
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
1,173!
1047
    if (hasNext) {
1,172✔
1048
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
712✔
1049
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
712✔
1050
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
712!
1051
      if (pTask->options.order == TSDB_ORDER_ASC) {
712✔
1052
        tsInfo->ts = pTask->pResBlock->info.window.skey;
573✔
1053
      } else {
1054
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
139✔
1055
      }
1056
      tsInfo->gId = qStreamGetGroupId(pTask->pTableList, pTask->pResBlock->info.id.uid);
712✔
1057
      stDebug("vgId:%d %s get last ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
713✔
1058
              tsInfo->gId, tsRsp->ver);
1059
    }
1060

1061
    pTask->currentGroupIndex++;
1,173✔
1062
    if (pTask->currentGroupIndex >= qStreamGetTableListGroupNum(pTask->pTableList)) {
1,173✔
1063
      break;
582✔
1064
    }
1065
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTask));
591!
1066
  }
1067

1068
end:
582✔
1069
  return code;
582✔
1070
}
1071

1072
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
193✔
1073
                               SStreamReaderTaskInner* pTask, int64_t ver) {
1074
  int32_t code = 0;
193✔
1075
  int32_t lino = 0;
193✔
1076
  int64_t suid = 0;
193✔
1077
  SArray* pList = NULL;
193✔
1078
  int32_t pNum = 0;
193✔
1079

1080
  tsRsp->tsInfo = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(STsInfo));
193✔
1081
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
193!
1082
  for (int32_t i = 0; i < taosArrayGetSize(sStreamReaderInfo->uidListIndex) - 1; ++i) {
386✔
1083
    STREAM_CHECK_RET_GOTO(getTableList(&pList, &pNum, &suid, i, sStreamReaderInfo));
193!
1084

1085
    cleanupQueryTableDataCond(&pTask->cond);
193✔
1086
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas,
193!
1087
                                                        pTask->options.isSchema, pTask->options.twindows, suid, ver));
1088
    STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderOpen(
193!
1089
        pVnode, &pTask->cond, taosArrayGet(pList, 0), pNum, pTask->pResBlock, (void**)&pTask->pReader, pTask->idStr, NULL));
1090
    taosArrayDestroy(pList);
193✔
1091
    pList = NULL;
193✔
1092
    while (true) {
465✔
1093
      bool hasNext = false;
658✔
1094
      STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
658!
1095
      if (!hasNext) {
658✔
1096
        break;
193✔
1097
      }
1098
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
465✔
1099
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
465✔
1100
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
465!
1101
      if (pTask->options.order == TSDB_ORDER_ASC) {
465✔
1102
        tsInfo->ts = pTask->pResBlock->info.window.skey;
195✔
1103
      } else {
1104
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
270✔
1105
      }
1106
      tsInfo->gId = pTask->pResBlock->info.id.uid;
465✔
1107
      stDebug("vgId:%d %s get vtable last ts:%" PRId64 ", uid:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__,
465✔
1108
              tsInfo->ts, tsInfo->gId, tsRsp->ver);
1109
    }
1110
    pTask->api.tsdReader.tsdReaderClose(pTask->pReader);
193✔
1111
    pTask->pReader = NULL;
193✔
1112
  }
1113

1114
end:
193✔
1115
  taosArrayDestroy(pList);
193✔
1116
  return code;
193✔
1117
}
1118

1119
static void reSetUid(SStreamTriggerReaderTaskInnerOptions* options, int64_t suid, int64_t uid) {
408✔
1120
  if (suid != 0) options->suid = suid;
408✔
1121
  options->uid = uid;
408✔
1122
  if (options->suid != 0) {
408✔
1123
    options->tableType = TD_CHILD_TABLE;
404✔
1124
  } else {
1125
    options->tableType = TD_NORMAL_TABLE;
4✔
1126
  }
1127
}
408✔
1128

1129
static int32_t createOptionsForTsdbData(SVnode* pVnode, SStreamTriggerReaderTaskInnerOptions* options,
375✔
1130
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid, SArray* cols,
1131
                                        int8_t order,int64_t skey, int64_t ekey, int64_t ver) {
1132
  int32_t code = 0;
375✔
1133
  int32_t lino = 0;
375✔
1134
  SArray* schemas = NULL;
375✔
1135

1136
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
375!
1137
  STREAM_CHECK_RET_GOTO(shrinkScheams(cols, schemas));
375!
1138
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, skey, ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
375✔
1139
  *options = op;
375✔
1140

1141
end:
375✔
1142
  return code;
375✔
1143
}
1144

1145
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
140✔
1146
  int32_t code = 0;
140✔
1147
  int32_t lino = 0;
140✔
1148
  void*   buf = NULL;
140✔
1149
  size_t  size = 0;
140✔
1150
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
140!
1151
  void* pTask = sStreamReaderInfo->pTask;
140✔
1152

1153
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
140✔
1154

1155
  TSWAP(sStreamReaderInfo->uidList, req->setTableReq.uids);
140✔
1156
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidList, TSDB_CODE_INVALID_PARA);
140!
1157

1158
  taosArraySort(sStreamReaderInfo->uidList, taosCompareInt64Asc);
140✔
1159
  if (sStreamReaderInfo->uidListIndex != NULL) {
140!
1160
    taosArrayClear(sStreamReaderInfo->uidListIndex);
×
1161
  } else {
1162
    sStreamReaderInfo->uidListIndex = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(int32_t));
140✔
1163
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidListIndex, TSDB_CODE_INVALID_PARA);
140!
1164
  }
1165

1166
  if (sStreamReaderInfo->uidHash != NULL) {
140!
1167
    taosHashClear(sStreamReaderInfo->uidHash);
×
1168
  } else {
1169
    sStreamReaderInfo->uidHash = taosHashInit(taosArrayGetSize(sStreamReaderInfo->uidList),
140✔
1170
                                              taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1171
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHash, TSDB_CODE_INVALID_PARA);
140!
1172
  }
1173

1174
  int64_t suid = 0;
140✔
1175
  int32_t cnt = taosArrayGetSize(sStreamReaderInfo->uidList);
140✔
1176
  for (int32_t i = 0; i < cnt; ++i) {
478✔
1177
    int64_t* data = taosArrayGet(sStreamReaderInfo->uidList, i);
338✔
1178
    STREAM_CHECK_NULL_GOTO(data, terrno);
338!
1179
    if (*data == 0 || *data != suid) {
338✔
1180
      STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &i), terrno);
280!
1181
    }
1182
    suid = *data;
338✔
1183
    stDebug("vgId:%d %s suid:%" PRId64 ",uid:%" PRId64 ", index:%d", TD_VID(pVnode), __func__, suid, data[1], i);
338✔
1184
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->uidHash, data + 1, LONG_BYTES, &i, sizeof(int32_t)));
338!
1185
  }
1186
  STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &cnt), terrno);
280!
1187

1188
end:
140✔
1189
  STREAM_PRINT_LOG_END_WITHID(code, lino);
140!
1190
  SRpcMsg rsp = {
140✔
1191
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1192
  tmsgSendRsp(&rsp);
140✔
1193
  return code;
140✔
1194
}
1195

1196
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
412✔
1197
  int32_t                 code = 0;
412✔
1198
  int32_t                 lino = 0;
412✔
1199
  SArray*                 schemas = NULL;
412✔
1200
  SStreamReaderTaskInner* pTaskInner = NULL;
412✔
1201
  SStreamTsResponse       lastTsRsp = {0};
412✔
1202
  void*                   buf = NULL;
412✔
1203
  size_t                  size = 0;
412✔
1204

1205
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
412!
1206
  void* pTask = sStreamReaderInfo->pTask;
412✔
1207

1208
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
412✔
1209

1210
  SStreamTriggerReaderTaskInnerOptions options = {0};
412✔
1211
  STREAM_CHECK_RET_GOTO(createOptionsForLastTs(&options, sStreamReaderInfo));
412!
1212
  SStorageAPI api = {0};
414✔
1213
  initStorageAPI(&api);
414✔
1214
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
414!
1215

1216
  lastTsRsp.ver = pVnode->state.applied;
414✔
1217
  if (sStreamReaderInfo->uidList != NULL) {
414✔
1218
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner, -1));
139!
1219
  } else {
1220
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
275!
1221
  }
1222
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
414✔
1223
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
414!
1224

1225
end:
414✔
1226
  STREAM_PRINT_LOG_END_WITHID(code, lino);
414!
1227
  SRpcMsg rsp = {
414✔
1228
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1229
  tmsgSendRsp(&rsp);
414✔
1230
  taosArrayDestroy(lastTsRsp.tsInfo);
414✔
1231
  releaseStreamTask(&pTaskInner);
414✔
1232
  return code;
414✔
1233
}
1234

1235
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
361✔
1236
  int32_t                 code = 0;
361✔
1237
  int32_t                 lino = 0;
361✔
1238
  SStreamReaderTaskInner* pTaskInner = NULL;
361✔
1239
  SStreamTsResponse       firstTsRsp = {0};
361✔
1240
  void*                   buf = NULL;
361✔
1241
  size_t                  size = 0;
361✔
1242

1243
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
361!
1244
  void* pTask = sStreamReaderInfo->pTask;
361✔
1245
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
361✔
1246
  SStreamTriggerReaderTaskInnerOptions options = {0};
361✔
1247
  STREAM_CHECK_RET_GOTO(createOptionsForFirstTs(&options, sStreamReaderInfo, req->firstTsReq.startTime, req->firstTsReq.ver));
361!
1248
  SStorageAPI api = {0};
361✔
1249
  initStorageAPI(&api);
361✔
1250
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
361!
1251
  
1252
  firstTsRsp.ver = pVnode->state.applied;
361✔
1253
  if (sStreamReaderInfo->uidList != NULL) {
361✔
1254
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.ver));
54!
1255
  } else {
1256
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
307!
1257
  }
1258

1259
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
361✔
1260
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
361!
1261

1262
end:
361✔
1263
  STREAM_PRINT_LOG_END_WITHID(code, lino);
361!
1264
  SRpcMsg rsp = {
361✔
1265
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1266
  tmsgSendRsp(&rsp);
361✔
1267
  taosArrayDestroy(firstTsRsp.tsInfo);
361✔
1268
  releaseStreamTask(&pTaskInner);
361✔
1269
  return code;
361✔
1270
}
1271

1272
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
589✔
1273
  int32_t code = 0;
589✔
1274
  int32_t lino = 0;
589✔
1275
  void*   buf = NULL;
589✔
1276
  size_t  size = 0;
589✔
1277
  SStreamTriggerReaderTaskInnerOptions options = {0};
589✔
1278

1279
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
589!
1280
  void* pTask = sStreamReaderInfo->pTask;
589✔
1281
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
589✔
1282

1283
  SStreamReaderTaskInner* pTaskInner = NULL;
589✔
1284
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
589✔
1285

1286
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
589!
1287
    SStreamTriggerReaderTaskInnerOptions optionsTs = {0};
589✔
1288

1289
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&optionsTs, sStreamReaderInfo, req->tsdbMetaReq.startTime,
591!
1290
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, true));
1291
    SStorageAPI api = {0};
589✔
1292
    initStorageAPI(&api);
589✔
1293
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &optionsTs, &pTaskInner, NULL, sStreamReaderInfo->groupIdMap, &api));
589✔
1294
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
587!
1295
    
1296
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&options, sStreamReaderInfo, req->tsdbMetaReq.startTime,
587!
1297
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, false));
1298
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(options.schemas, &pTaskInner->pResBlockDst));
587!
1299
  } else {
1300
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1301
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1302
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1303
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1304
  }
1305

1306
  blockDataCleanup(pTaskInner->pResBlockDst);
587✔
1307
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
587!
1308
  bool hasNext = true;
587✔
1309
  while (true) {
216✔
1310
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
803✔
1311
    if (!hasNext) {
797✔
1312
      break;
581✔
1313
    }
1314
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
216✔
1315
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
216✔
1316

1317
    int32_t index = 0;
216✔
1318
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
216!
1319
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
216!
1320
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
216!
1321
    if (sStreamReaderInfo->uidList == NULL) {
216✔
1322
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
32!
1323
    }
1324
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
216!
1325

1326
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
216✔
1327
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1328
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1329
            pTaskInner->pResBlockDst->info.rows++;
216✔
1330
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
216!
1331
      break;
×
1332
    }
1333
  }
1334

1335
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
581✔
1336
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
581!
1337
  if (!hasNext) {
581!
1338
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
581!
1339
  }
1340

1341
end:
581✔
1342
  STREAM_PRINT_LOG_END_WITHID(code, lino);
589!
1343
  SRpcMsg rsp = {
589✔
1344
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1345
  tmsgSendRsp(&rsp);
589✔
1346
  taosArrayDestroy(options.schemas);
589✔
1347
  return code;
589✔
1348
}
1349

1350
static int32_t vnodeProcessStreamTsdbTsDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
33✔
1351
  int32_t                 code = 0;
33✔
1352
  int32_t                 lino = 0;
33✔
1353
  SStreamReaderTaskInner* pTaskInner = NULL;
33✔
1354
  void*                   buf = NULL;
33✔
1355
  size_t                  size = 0;
33✔
1356
  SSDataBlock*            pBlockRes = NULL;
33✔
1357

1358
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
33!
1359
  void* pTask = sStreamReaderInfo->pTask;
33✔
1360
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
33!
1361

1362
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
33✔
1363
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
1364
  reSetUid(&options, req->tsdbTsDataReq.suid, req->tsdbTsDataReq.uid);
33✔
1365
  SStorageAPI api = {0};
33✔
1366
  initStorageAPI(&api);
33✔
1367
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
33!
1368
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
33!
1369
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
33!
1370

1371
  while (1) {
33✔
1372
    bool hasNext = false;
66✔
1373
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
66!
1374
    if (!hasNext) {
66✔
1375
      break;
33✔
1376
    }
1377
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
33✔
1378

1379
    SSDataBlock* pBlock = NULL;
33✔
1380
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
33!
1381
    if (pBlock != NULL && pBlock->info.rows > 0) {
33!
1382
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &api, pBlock));
33!
1383
    }
1384
    
1385
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
33!
1386
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
33!
1387
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
33!
1388
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1389
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1390
  }
1391

1392
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
33✔
1393

1394
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
33!
1395
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
33!
1396

1397
end:
33✔
1398
  STREAM_PRINT_LOG_END_WITHID(code, lino);
33!
1399
  SRpcMsg rsp = {
33✔
1400
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1401
  tmsgSendRsp(&rsp);
33✔
1402
  blockDataDestroy(pBlockRes);
33✔
1403

1404
  releaseStreamTask(&pTaskInner);
33✔
1405
  return code;
33✔
1406
}
1407

1408
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
341✔
1409
  int32_t code = 0;
341✔
1410
  int32_t lino = 0;
341✔
1411
  void*   buf = NULL;
341✔
1412
  size_t  size = 0;
341✔
1413

1414
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
341!
1415
  SStreamReaderTaskInner* pTaskInner = NULL;
341✔
1416
  void* pTask = sStreamReaderInfo->pTask;
341✔
1417
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
341✔
1418
  
1419
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
341✔
1420

1421
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
341✔
1422
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, true, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
165✔
1423
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
1424
                 req->tsdbTriggerDataReq.gid, true, NULL);
1425
    SStorageAPI api = {0};
165✔
1426
    initStorageAPI(&api);
165✔
1427
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock,
165!
1428
                                           sStreamReaderInfo->groupIdMap, &api));
1429
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
165!
1430
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
165!
1431
  } else {
1432
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
176✔
1433
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
176!
1434
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
176✔
1435
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
176!
1436
  }
1437

1438
  blockDataCleanup(pTaskInner->pResBlockDst);
341✔
1439
  bool hasNext = true;
341✔
1440
  while (1) {
×
1441
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
341!
1442
    if (!hasNext) {
341✔
1443
      break;
165✔
1444
    }
1445
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
176✔
1446
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
176✔
1447

1448
    SSDataBlock* pBlock = NULL;
176✔
1449
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
176!
1450
    if (pBlock != NULL && pBlock->info.rows > 0) {
176!
1451
      STREAM_CHECK_RET_GOTO(
176!
1452
        processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &pTaskInner->api, pBlock));
1453
    }
1454
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
176!
1455
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
176!
1456
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
176✔
1457
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1458
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1459
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
176!
1460
      break;
176✔
1461
    }
1462
  }
1463

1464
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
341!
1465
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
341✔
1466
  if (!hasNext) {
341✔
1467
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
165!
1468
  }
1469

1470
end:
341✔
1471
  STREAM_PRINT_LOG_END_WITHID(code, lino);
341!
1472
  SRpcMsg rsp = {
341✔
1473
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1474
  tmsgSendRsp(&rsp);
341✔
1475

1476
  return code;
341✔
1477
}
1478

1479
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
28,791✔
1480
  int32_t code = 0;
28,791✔
1481
  int32_t lino = 0;
28,791✔
1482
  void*   buf = NULL;
28,791✔
1483
  size_t  size = 0;
28,791✔
1484
  SSDataBlock*            pBlockRes = NULL;
28,791✔
1485

1486
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
28,791!
1487
  void* pTask = sStreamReaderInfo->pTask;
28,791✔
1488
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, 
28,791✔
1489
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid);
1490

1491
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
28,791!
1492

1493
  SStreamReaderTaskInner* pTaskInner = NULL;
28,791✔
1494
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
28,791✔
1495

1496
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
28,791!
1497
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
28,791✔
1498
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
1499
    SStorageAPI api = {0};
28,791✔
1500
    initStorageAPI(&api);
28,791✔
1501
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
28,790✔
1502

1503
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
28,498!
1504
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
28,498!
1505
  } else {
1506
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1507
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1508
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1509
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1510
  }
1511

1512
  blockDataCleanup(pTaskInner->pResBlockDst);
28,497✔
1513
  bool hasNext = true;
28,497✔
1514
  while (1) {
2,802✔
1515
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
31,299!
1516
    if (!hasNext) {
31,293✔
1517
      break;
28,496✔
1518
    }
1519
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
2,797✔
1520

1521
    SSDataBlock* pBlock = NULL;
2,800✔
1522
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
2,800!
1523
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
2,800!
1524
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
2,796!
1525
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
2,802!
1526
      break;
×
1527
    }
1528
  }
1529

1530
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
28,496!
1531
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
28,496!
1532
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
28,494✔
1533
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
28,498!
1534
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
28,497✔
1535
  if (!hasNext) {
28,498!
1536
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
28,498!
1537
  }
1538

1539
end:
28,498✔
1540
  STREAM_PRINT_LOG_END_WITHID(code, lino);
28,791!
1541
  SRpcMsg rsp = {
28,793✔
1542
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1543
  tmsgSendRsp(&rsp);
28,793✔
1544
  blockDataDestroy(pBlockRes);
28,789✔
1545
  return code;
28,790✔
1546
}
1547

1548
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
375✔
1549
  int32_t code = 0;
375✔
1550
  int32_t lino = 0;
375✔
1551
  void*   buf = NULL;
375✔
1552
  size_t  size = 0;
375✔
1553

1554
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
375!
1555
  void* pTask = sStreamReaderInfo->pTask;
375✔
1556
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
375✔
1557

1558
  SStreamReaderTaskInner* pTaskInner = NULL;
375✔
1559
  int64_t key = req->tsdbDataReq.uid;
375✔
1560

1561
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
375!
1562
    SStreamTriggerReaderTaskInnerOptions options = {0};
375✔
1563

1564
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbData(pVnode, &options, sStreamReaderInfo, req->tsdbDataReq.uid,
375!
1565
                                                   req->tsdbDataReq.cids, req->tsdbDataReq.order, req->tsdbDataReq.skey,
1566
                                                   req->tsdbDataReq.ekey, req->tsdbDataReq.ver));
1567
    reSetUid(&options, req->tsdbDataReq.suid, req->tsdbDataReq.uid);
375✔
1568

1569
    SStorageAPI api = {0};
375✔
1570
    initStorageAPI(&api);
375✔
1571
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
375!
1572

1573
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
375✔
1574
    cleanupQueryTableDataCond(&pTaskInner->cond);
375✔
1575
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
375!
1576
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
1577
                                                        pTaskInner->options.suid, pTaskInner->options.ver));
1578
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
375!
1579
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
1580

1581
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
375!
1582
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
375!
1583
  } else {
1584
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1585
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1586
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1587
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1588
  }
1589

1590
  blockDataCleanup(pTaskInner->pResBlockDst);
375✔
1591
  bool hasNext = true;
375✔
1592
  while (1) {
377✔
1593
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
752!
1594
    if (!hasNext) {
752✔
1595
      break;
375✔
1596
    }
1597

1598
    SSDataBlock* pBlock = NULL;
377✔
1599
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
377!
1600
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
377!
1601
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
377!
1602
      break;
×
1603
    }
1604
  }
1605
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
375!
1606
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
375✔
1607
  if (!hasNext) {
375!
1608
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
375!
1609
  }
1610

1611
end:
375✔
1612
  STREAM_PRINT_LOG_END_WITHID(code, lino);
375!
1613
  SRpcMsg rsp = {
375✔
1614
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1615
  tmsgSendRsp(&rsp);
375✔
1616

1617
  return code;
375✔
1618
}
1619

1620
static int32_t vnodeProcessStreamWalMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
8,892✔
1621
  int32_t      code = 0;
8,892✔
1622
  int32_t      lino = 0;
8,892✔
1623
  void*        buf = NULL;
8,892✔
1624
  size_t       size = 0;
8,892✔
1625
  SSDataBlock* pBlock = NULL;
8,892✔
1626
  void*        pTableList = NULL;
8,892✔
1627
  SNodeList*   groupNew = NULL;
8,892✔
1628
  int64_t      lastVer = 0;
8,892✔
1629

1630
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
8,892✔
1631
  void* pTask = sStreamReaderInfo->pTask;
8,872✔
1632
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64 ",ctime:%" PRId64, TD_VID(pVnode), __func__,
8,872✔
1633
  req->walMetaReq.lastVer, req->walMetaReq.ctime);
1634

1635
  bool isVTable = sStreamReaderInfo->uidList != NULL;
8,872✔
1636
  if (sStreamReaderInfo->uidList == NULL) {
8,872✔
1637
    SStorageAPI api = {0};
5,682✔
1638
    initStorageAPI(&api);
5,682✔
1639
    STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
5,693!
1640
    STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
5,690!
1641
        pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1642
        groupNew, false, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1643
        &pTableList, sStreamReaderInfo->groupIdMap));
1644
  }
1645

1646
  STREAM_CHECK_RET_GOTO(createBlockForWalMeta(&pBlock, isVTable));
8,878!
1647
  STREAM_CHECK_RET_GOTO(scanWal(pVnode, isVTable ? sStreamReaderInfo->uidHash : pTableList, isVTable, pBlock,
8,889!
1648
                                req->walMetaReq.lastVer, sStreamReaderInfo->deleteReCalc,
1649
                                sStreamReaderInfo->deleteOutTbl, req->walMetaReq.ctime, &lastVer));
1650

1651
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
8,896✔
1652
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
8,896!
1653
  printDataBlock(pBlock, __func__, "");
8,883✔
1654

1655
end:
8,899✔
1656
  if (pBlock != NULL && pBlock->info.rows == 0) {
8,899✔
1657
    code = TSDB_CODE_STREAM_NO_DATA;
8,327✔
1658
    buf = rpcMallocCont(sizeof(int64_t));
8,327✔
1659
    *(int64_t *)buf = lastVer;
8,336✔
1660
    size = sizeof(int64_t);
8,336✔
1661
  }
1662
  SRpcMsg rsp = {
8,908✔
1663
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1664
  tmsgSendRsp(&rsp);
8,908✔
1665
  if (code == TSDB_CODE_STREAM_NO_DATA){
8,916✔
1666
    code = 0;
8,339✔
1667
  }
1668
  STREAM_PRINT_LOG_END_WITHID(code, lino);
8,916!
1669
  nodesDestroyList(groupNew);
8,916✔
1670
  blockDataDestroy(pBlock);
8,916✔
1671
  qStreamDestroyTableList(pTableList);
8,917✔
1672

1673
  return code;
8,916✔
1674
}
1675

1676
static int32_t vnodeProcessStreamWalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
17,278✔
1677
  int32_t      code = 0;
17,278✔
1678
  int32_t      lino = 0;
17,278✔
1679
  void*        buf = NULL;
17,278✔
1680
  size_t       size = 0;
17,278✔
1681
  SSDataBlock* pBlock = NULL;
17,278✔
1682

1683
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
17,278!
1684
  void* pTask = sStreamReaderInfo->pTask;
17,278✔
1685
  ST_TASK_DLOG("vgId:%d %s start, request type:%d skey:%" PRId64 ",ekey:%" PRId64 ",uid:%" PRId64 ",ver:%" PRId64, TD_VID(pVnode),
17,278✔
1686
  __func__, req->walDataReq.base.type, req->walDataReq.skey, req->walDataReq.ekey, req->walDataReq.uid, req->walDataReq.ver);
1687

1688
  STimeWindow window = {.skey = req->walDataReq.skey, .ekey = req->walDataReq.ekey};
17,279✔
1689
  if (req->walDataReq.base.type == STRIGGER_PULL_WAL_DATA){
17,279✔
1690
    STREAM_CHECK_RET_GOTO(processWalVerDataVTable(pVnode, req->walDataReq.cids, req->walDataReq.ver,
11,073!
1691
      req->walDataReq.uid, &window, &pBlock));
1692
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_CALC_DATA){
6,206✔
1693
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
3,201!
1694
      req->walDataReq.uid, &window, sStreamReaderInfo->calcResBlock, &pBlock));
1695
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TRIGGER_DATA) {
3,005✔
1696
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, true,
2,333!
1697
      req->walDataReq.uid, &window, sStreamReaderInfo->triggerResBlock, &pBlock));
1698
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TS_DATA){
672!
1699
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
674!
1700
      req->walDataReq.uid, &window, sStreamReaderInfo->tsBlock, &pBlock));
1701

1702
  }
1703
  
1704
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
17,276✔
1705
  printDataBlock(pBlock, __func__, "");
17,276✔
1706
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
17,272!
1707
end:
17,280✔
1708
  STREAM_PRINT_LOG_END_WITHID(code, lino);
17,280!
1709
  SRpcMsg rsp = {
17,280✔
1710
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1711
  tmsgSendRsp(&rsp);
17,280✔
1712

1713
  blockDataDestroy(pBlock);
17,277✔
1714
  return code;
17,286✔
1715
}
1716

1717
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
713✔
1718
  int32_t code = 0;
713✔
1719
  int32_t lino = 0;
713✔
1720
  void*   buf = NULL;
713✔
1721
  size_t  size = 0;
713✔
1722

1723
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
713!
1724
  void* pTask = sStreamReaderInfo->pTask;
713✔
1725
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
713✔
1726

1727
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
713✔
1728
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
713!
1729
  SStreamGroupInfo pGroupInfo = {0};
713✔
1730
  pGroupInfo.gInfo = *gInfo;
713✔
1731

1732
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
713✔
1733
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1734
  buf = rpcMallocCont(size);
713✔
1735
  STREAM_CHECK_NULL_GOTO(buf, terrno);
713!
1736
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
713✔
1737
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1738
end:
713✔
1739
  if (code != 0) {
713!
1740
    rpcFreeCont(buf);
×
1741
    buf = NULL;
×
1742
    size = 0;
×
1743
  }
1744
  STREAM_PRINT_LOG_END_WITHID(code, lino);
713!
1745
  SRpcMsg rsp = {
713✔
1746
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1747
  tmsgSendRsp(&rsp);
713✔
1748

1749
  return code;
713✔
1750
}
1751

1752
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
556✔
1753
  int32_t              code = 0;
556✔
1754
  int32_t              lino = 0;
556✔
1755
  void*                buf = NULL;
556✔
1756
  size_t               size = 0;
556✔
1757
  SStreamMsgVTableInfo vTableInfo = {0};
556✔
1758
  SMetaReader          metaReader = {0};
556✔
1759
  SNodeList*           groupNew = NULL;
556✔
1760
  void*                pTableList = NULL;
556✔
1761
  SStorageAPI api = {0};
556✔
1762
  initStorageAPI(&api);
556✔
1763

1764
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
556!
1765
  void* pTask = sStreamReaderInfo->pTask;
556✔
1766
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
556✔
1767

1768
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
556!
1769
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
555!
1770
      pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1771
      groupNew, true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1772
      &pTableList, sStreamReaderInfo->groupIdMap));
1773

1774
  SArray* cids = req->virTableInfoReq.cids;
556✔
1775
  STREAM_CHECK_NULL_GOTO(cids, terrno);
556!
1776

1777
  SArray* pTableListArray = qStreamGetTableArrayList(pTableList);
556✔
1778
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
556!
1779

1780
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
556✔
1781
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
556!
1782
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
556✔
1783

1784
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
1,848✔
1785
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
1,292✔
1786
    if (pKeyInfo == NULL) {
1,292!
1787
      continue;
×
1788
    }
1789
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
1,292✔
1790
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
1,292!
1791
    vTable->uid = pKeyInfo->uid;
1,292✔
1792
    vTable->gId = pKeyInfo->groupId;
1,292✔
1793

1794
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
1,292✔
1795
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
1,291!
1796
      vTable->cols.nCols = metaReader.me.colRef.nCols;
×
1797
      vTable->cols.version = metaReader.me.colRef.version;
×
1798
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
×
1799
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
×
1800
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
×
1801
      }
1802
    } else {
1803
      vTable->cols.nCols = taosArrayGetSize(cids);
1,291✔
1804
      vTable->cols.version = metaReader.me.colRef.version;
1,291✔
1805
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
1,291!
1806
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
7,767✔
1807
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
27,522✔
1808
          if (metaReader.me.colRef.pColRef[j].hasRef &&
26,187✔
1809
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
19,568✔
1810
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
5,140✔
1811
            break;
5,140✔
1812
          }
1813
        }
1814
      }
1815
    }
1816
    tDecoderClear(&metaReader.coder);
1,290✔
1817
  }
1818
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
556✔
1819
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
556!
1820

1821
end:
556✔
1822
  nodesDestroyList(groupNew);
556✔
1823
  qStreamDestroyTableList(pTableList);
556✔
1824
  tDestroySStreamMsgVTableInfo(&vTableInfo);
556✔
1825
  api.metaReaderFn.clearReader(&metaReader);
556✔
1826
  STREAM_PRINT_LOG_END_WITHID(code, lino);
556!
1827
  SRpcMsg rsp = {
556✔
1828
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1829
  tmsgSendRsp(&rsp);
556✔
1830
  return code;
556✔
1831
}
1832

1833
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
141✔
1834
  int32_t                   code = 0;
141✔
1835
  int32_t                   lino = 0;
141✔
1836
  void*                     buf = NULL;
141✔
1837
  size_t                    size = 0;
141✔
1838
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
141✔
1839
  SMetaReader               metaReader = {0};
141✔
1840
  int64_t streamId = req->base.streamId;
141✔
1841
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
141✔
1842

1843
  SStorageAPI api = {0};
141✔
1844
  initStorageAPI(&api);
141✔
1845

1846
  SArray* cols = req->origTableInfoReq.cols;
141✔
1847
  STREAM_CHECK_NULL_GOTO(cols, terrno);
141!
1848

1849
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
141✔
1850

1851
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
141!
1852

1853
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
141✔
1854
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1,418✔
1855
    OTableInfo*    oInfo = taosArrayGet(cols, i);
1,278✔
1856
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
1,278✔
1857
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
1,278!
1858
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
1,278!
1859
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
1,278✔
1860
    vTableInfo->uid = metaReader.me.uid;
1,276✔
1861
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
1,276✔
1862

1863
    SSchemaWrapper* sSchemaWrapper = NULL;
1,277✔
1864
    if (metaReader.me.type == TD_CHILD_TABLE) {
1,277✔
1865
      int64_t suid = metaReader.me.ctbEntry.suid;
1,261✔
1866
      vTableInfo->suid = suid;
1,261✔
1867
      tDecoderClear(&metaReader.coder);
1,261✔
1868
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
1,261!
1869
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
1,260✔
1870
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
16!
1871
      vTableInfo->suid = 0;
16✔
1872
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
16✔
1873
    } else {
1874
      stError("invalid table type:%d", metaReader.me.type);
×
1875
    }
1876

1877
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
4,851!
1878
      SSchema* s = sSchemaWrapper->pSchema + j;
4,852✔
1879
      if (strcmp(s->name, oInfo->refColName) == 0) {
4,852✔
1880
        vTableInfo->cid = s->colId;
1,277✔
1881
        break;
1,277✔
1882
      }
1883
    }
1884
    if (vTableInfo->cid == 0) {
1,276!
1885
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
1886
              oInfo->refTableName);
1887
    }
1888
    tDecoderClear(&metaReader.coder);
1,276✔
1889
  }
1890

1891
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
140!
1892

1893
end:
140✔
1894
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
141✔
1895
  api.metaReaderFn.clearReader(&metaReader);
141✔
1896
  STREAM_PRINT_LOG_END(code, lino);
141!
1897
  SRpcMsg rsp = {
141✔
1898
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1899
  tmsgSendRsp(&rsp);
141✔
1900
  return code;
141✔
1901
}
1902

1903
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
2,472✔
1904
  int32_t                   code = 0;
2,472✔
1905
  int32_t                   lino = 0;
2,472✔
1906
  void*                     buf = NULL;
2,472✔
1907
  size_t                    size = 0;
2,472✔
1908
  SSDataBlock* pBlock = NULL;
2,472✔
1909

1910
  SMetaReader               metaReader = {0};
2,472✔
1911
  SMetaReader               metaReaderStable = {0};
2,472✔
1912
  int64_t streamId = req->base.streamId;
2,472✔
1913
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
2,472✔
1914

1915
  SStorageAPI api = {0};
2,472✔
1916
  initStorageAPI(&api);
2,472✔
1917

1918
  SArray* cols = req->virTablePseudoColReq.cids;
2,472✔
1919
  STREAM_CHECK_NULL_GOTO(cols, terrno);
2,472!
1920

1921
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
2,472✔
1922
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
2,472!
1923

1924
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
2,472!
1925

1926
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
2,472!
1927
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
2,472✔
1928
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 && *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
15!
1929
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
15✔
1930
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
15!
1931
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
15!
1932
    pBlock->info.rows = 1;
15✔
1933
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
15✔
1934
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
15!
1935
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
15!
1936
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
2,457!
1937
    int64_t suid = metaReader.me.ctbEntry.suid;
2,457✔
1938
    api.metaReaderFn.readerReleaseLock(&metaReader);
2,457✔
1939
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
2,457✔
1940

1941
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
2,457!
1942
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
2,457✔
1943
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
18,299✔
1944
      col_id_t* id = taosArrayGet(cols, i);
15,841✔
1945
      STREAM_CHECK_NULL_GOTO(id, terrno);
15,841!
1946
      if (*id == -1) {
15,842✔
1947
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
2,457✔
1948
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
2,457!
1949
        continue;
2,457✔
1950
      }
1951
      size_t j = 0;
13,385✔
1952
      for (; j < sSchemaWrapper->nCols; j++) {
50,467!
1953
        SSchema* s = sSchemaWrapper->pSchema + j;
50,467✔
1954
        if (s->colId == *id) {
50,467✔
1955
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
13,385✔
1956
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
13,385!
1957
          break;
13,385✔
1958
        }
1959
      }
1960
      if (j == sSchemaWrapper->nCols) {
13,385!
1961
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
1962
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
1963
      }
1964
    }
1965
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
2,457!
1966
    pBlock->info.rows = 1;
2,457✔
1967
    
1968
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
18,299✔
1969
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
15,842✔
1970
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
15,842!
1971

1972
      if (pDst->info.colId == -1) {
15,842✔
1973
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
2,457!
1974
        continue;
2,457✔
1975
      }
1976
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
13,385!
1977
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
1978
        continue;
×
1979
      }
1980

1981
      STagVal val = {0};
13,385✔
1982
      val.cid = pDst->info.colId;
13,385✔
1983
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
13,385✔
1984

1985
      char* data = NULL;
13,385✔
1986
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
13,385!
1987
        data = tTagValToData((const STagVal*)p, false);
11,458✔
1988
      } else {
1989
        data = (char*)p;
1,927✔
1990
      }
1991

1992
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
13,385!
1993
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
1994

1995
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
13,385!
1996
          (data != NULL)) {
1997
        taosMemoryFree(data);
7,656!
1998
      }
1999
    }
2000
  } else {
2001
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
2002
    code = TSDB_CODE_INVALID_PARA;
×
2003
    goto end;
×
2004
  }
2005
  
2006
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
2,472✔
2007
  printDataBlock(pBlock, __func__, "");
2,472✔
2008
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
2,472!
2009

2010
end:
2,472✔
2011
  api.metaReaderFn.clearReader(&metaReaderStable);
2,472✔
2012
  api.metaReaderFn.clearReader(&metaReader);
2,472✔
2013
  STREAM_PRINT_LOG_END(code, lino);
2,472!
2014
  SRpcMsg rsp = {
2,472✔
2015
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2016
  tmsgSendRsp(&rsp);
2,472✔
2017
  blockDataDestroy(pBlock);
2,472✔
2018
  return code;
2,472✔
2019
}
2020

2021
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
16,374✔
2022
  int32_t            code = 0;
16,374✔
2023
  int32_t            lino = 0;
16,374✔
2024
  void*              buf = NULL;
16,374✔
2025
  size_t             size = 0;
16,374✔
2026
  void*              taskAddr = NULL;
16,374✔
2027
  SArray*            pResList = NULL;
16,374✔
2028

2029
  SResFetchReq req = {0};
16,374✔
2030
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
16,374!
2031
                              TSDB_CODE_QRY_INVALID_INPUT);
2032
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
16,374✔
2033
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
16,374!
2034

2035
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
16,374!
2036
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
16,374✔
2037
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
16,374!
2038
  void* pTask = sStreamReaderCalcInfo->pTask;
16,374✔
2039
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
16,374✔
2040
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
2041

2042
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
16,374!
2043
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
16,311✔
2044
    int64_t uid = 0;
16,311✔
2045
    if (req.dynTbname) {
16,311✔
2046
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
6✔
2047
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
6!
2048
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
6✔
2049
        if (pValue != NULL && pValue->isTbname) {
6!
2050
          uid = pValue->uid;
6✔
2051
          break;
6✔
2052
        }
2053
      }
2054
    }
2055
    
2056
    SReadHandle handle = {0};
16,311✔
2057
    handle.vnode = pVnode;
16,311✔
2058
    handle.uid = uid;
16,311✔
2059

2060
    initStorageAPI(&handle.api);
16,311✔
2061
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
16,311✔
2062
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
15,866✔
2063
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
16,240✔
2064
      if (node != NULL) {
16,240✔
2065
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle));
16,223!
2066
      }
2067
    }
2068

2069
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
16,311✔
2070
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
16,311✔
2071

2072
    // if (sStreamReaderCalcInfo->pTaskInfo == NULL) {
2073
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
16,311✔
2074
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
2075
                                                    req.taskId));
2076
    // } else {
2077
    // STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, handle.winRange));
2078
    // }
2079

2080
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
16,305!
2081
  }
2082

2083
  if (req.pOpParam != NULL) {
16,368✔
2084
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
48✔
2085
  }
2086
  
2087
  pResList = taosArrayInit(4, POINTER_BYTES);
16,368✔
2088
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
16,368!
2089
  uint64_t ts = 0;
16,368✔
2090
  bool     hasNext = false;
16,368✔
2091
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
16,368✔
2092

2093
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
26,718✔
2094
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
10,360✔
2095
    if (pBlock == NULL) continue;
10,360!
2096
    printDataBlock(pBlock, __func__, "fetch");
10,360✔
2097
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
10,360✔
2098
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo));
344!
2099
      printDataBlock(pBlock, __func__, "fetch filter");
344✔
2100
    }
2101
  }
2102

2103
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
16,358!
2104
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
16,357✔
2105

2106
end:
158✔
2107
  taosArrayDestroy(pResList);
16,373✔
2108
  streamReleaseTask(taskAddr);
16,374✔
2109

2110
  STREAM_PRINT_LOG_END(code, lino);
16,373!
2111
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
16,373✔
2112
  tmsgSendRsp(&rsp);
16,373✔
2113
  tDestroySResFetchReq(&req);
16,373✔
2114
  return code;
16,373✔
2115
}
2116

2117
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
77,603✔
2118
  int32_t                   code = 0;
77,603✔
2119
  int32_t                   lino = 0;
77,603✔
2120
  SSTriggerPullRequestUnion req = {0};
77,603✔
2121
  void*                     taskAddr = NULL;
77,603✔
2122

2123
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
77,603✔
2124
  if (!syncIsReadyForRead(pVnode->sync)) {
77,603✔
2125
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
128✔
2126
    return 0;
125✔
2127
  }
2128

2129
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
77,499✔
2130
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
16,374✔
2131
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
61,125!
2132
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
61,126✔
2133
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
61,126✔
2134
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
61,126!
2135
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
61,121✔
2136
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
2137
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
61,113✔
2138
    switch (req.base.type) {
61,106!
2139
      case STRIGGER_PULL_SET_TABLE:
140✔
2140
        code = vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo);
140✔
2141
        break;
140✔
2142
      case STRIGGER_PULL_LAST_TS:
414✔
2143
        code = vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
414✔
2144
        break;
414✔
2145
      case STRIGGER_PULL_FIRST_TS:
361✔
2146
        code = vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
361✔
2147
        break;
361✔
2148
      case STRIGGER_PULL_TSDB_META:
589✔
2149
      case STRIGGER_PULL_TSDB_META_NEXT:
2150
        code = vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
589✔
2151
        break;
589✔
2152
      case STRIGGER_PULL_TSDB_TS_DATA:
33✔
2153
        code = vnodeProcessStreamTsdbTsDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
33✔
2154
        break;
33✔
2155
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
341✔
2156
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
2157
        code = vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
341✔
2158
        break;
341✔
2159
      case STRIGGER_PULL_TSDB_CALC_DATA:
28,791✔
2160
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
2161
        code = vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
28,791✔
2162
        break;
28,788✔
2163
      case STRIGGER_PULL_TSDB_DATA:
375✔
2164
      case STRIGGER_PULL_TSDB_DATA_NEXT:
2165
        code = vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
375✔
2166
        break;
375✔
2167
      case STRIGGER_PULL_WAL_META:
8,900✔
2168
        code = vnodeProcessStreamWalMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
8,900✔
2169
        break;
8,917✔
2170
      case STRIGGER_PULL_WAL_TS_DATA:
17,280✔
2171
      case STRIGGER_PULL_WAL_TRIGGER_DATA:
2172
      case STRIGGER_PULL_WAL_CALC_DATA:
2173
      case STRIGGER_PULL_WAL_DATA:
2174
        code = vnodeProcessStreamWalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
17,280✔
2175
        break;
17,285✔
2176
      case STRIGGER_PULL_GROUP_COL_VALUE:
713✔
2177
        code = vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo);
713✔
2178
        break;
713✔
2179
      case STRIGGER_PULL_VTABLE_INFO:
556✔
2180
        code = vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo);
556✔
2181
        break;
556✔
2182
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
2,472✔
2183
        code = vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req);
2,472✔
2184
        break;
2,472✔
2185
      case STRIGGER_PULL_OTABLE_INFO:
141✔
2186
        code = vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req);
141✔
2187
        break;
141✔
2188
      default:
×
2189
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
2190
        code = TSDB_CODE_APP_ERROR;
×
2191
        break;
×
2192
    }
2193
  } else {
2194
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
2195
    code = TSDB_CODE_APP_ERROR;
×
2196
  }
2197
end:
61,125✔
2198

2199
  streamReleaseTask(taskAddr);
61,125✔
2200

2201
  tDestroySTriggerPullRequest(&req);
61,124✔
2202
  STREAM_PRINT_LOG_END(code, lino);
61,120!
2203
  return code;
61,118✔
2204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc