• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4693

29 Aug 2025 06:36AM UTC coverage: 58.007% (-1.1%) from 59.151%
#4693

push

travis-ci

web-flow
fix(gpt): fix race-condition in preparing tmp files (#32800)

132676 of 291873 branches covered (45.46%)

Branch coverage included in aggregate %.

5 of 34 new or added lines in 6 files covered. (14.71%)

4288 existing lines in 199 files now uncovered.

201114 of 283561 relevant lines covered (70.92%)

5433357.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.84
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "scalar.h"
17
#include "streamReader.h"
18
#include "tdatablock.h"
19
#include "tdb.h"
20
#include "tencode.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23
#include "vnd.h"
24
#include "vnode.h"
25
#include "vnodeInt.h"
26

27
#define BUILD_OPTION(options, sStreamInfo, _ver, _groupSort, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
28
                     _gid, _initReader, _uidList)                                                                        \
29
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_uidList == NULL ? sStreamInfo->suid : 0),              \
30
                                                  .uid = sStreamInfo->uid,                                         \
31
                                                  .ver = _ver,                                                     \
32
                                                  .tableType = sStreamInfo->tableType,                             \
33
                                                  .groupSort = _groupSort,                                         \
34
                                                  .order = _order,                                                 \
35
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
36
                                                  .pTagCond = sStreamInfo->pTagCond,                               \
37
                                                  .pTagIndexCond = sStreamInfo->pTagIndexCond,                     \
38
                                                  .pConditions = sStreamInfo->pConditions,                         \
39
                                                  .partitionCols = sStreamInfo->partitionCols,                     \
40
                                                  .schemas = _schemas,                                             \
41
                                                  .isSchema = _isSchema,                                           \
42
                                                  .scanMode = _scanMode,                                           \
43
                                                  .gid = _gid,                                                     \
44
                                                  .initReader = _initReader,                                       \
45
                                                  .uidList = _uidList};
46

47
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
29,404✔
48

49
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
50

51
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
22,493✔
52
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
22,493✔
53
  if (pSrc == NULL) {
22,457!
UNCOV
54
    return terrno;
×
55
  }
56

57
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
22,483✔
58
  return 0;
22,483✔
59
}
60

61
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
33,966✔
62
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
33,966✔
63
  if (code != TSDB_CODE_SUCCESS) {
33,965!
UNCOV
64
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
65
  }
66

67
  return code;
33,968✔
68
}
69

70
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
3,075✔
71
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
3,075✔
72
}
73

74
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
28✔
75
  int32_t code = 0;
28✔
76
  int32_t lino = 0;
28✔
77
  void*   buf = NULL;
28✔
78
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
28✔
79
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
28!
80
  buf = rpcMallocCont(len);
28✔
81
  STREAM_CHECK_NULL_GOTO(buf, terrno);
28!
82
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
28✔
83
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
28!
84
  *data = buf;
28✔
85
  *size = len;
28✔
86
  buf = NULL;
28✔
87
end:
28✔
88
  rpcFreeCont(buf);
28✔
89
  return code;
28✔
90
}
91

92
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
56✔
93
  int32_t code = 0;
56✔
94
  int32_t lino = 0;
56✔
95
  void*   buf = NULL;
56✔
96
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
56✔
97
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
56!
98
  buf = rpcMallocCont(len);
56✔
99
  STREAM_CHECK_NULL_GOTO(buf, terrno);
56!
100
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
56✔
101
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
56!
102
  *data = buf;
56✔
103
  *size = len;
56✔
104
  buf = NULL;
56✔
105
end:
56✔
106
  rpcFreeCont(buf);
56✔
107
  return code;
56✔
108
}
109

110
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
621✔
111
  int32_t code = 0;
621✔
112
  int32_t lino = 0;
621✔
113
  void*   buf = NULL;
621✔
114
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
621✔
115
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
620!
116
  buf = rpcMallocCont(len);
620✔
117
  STREAM_CHECK_NULL_GOTO(buf, terrno);
620!
118
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
620✔
119
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
621!
120
  *data = buf;
621✔
121
  *size = len;
621✔
122
  buf = NULL;
621✔
123
end:
621✔
124
  rpcFreeCont(buf);
621✔
125
  return code;
621✔
126
}
127

128

129
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
44,026✔
130
  int32_t code = 0;
44,026✔
131
  int32_t lino = 0;
44,026✔
132
  void*   buf = NULL;
44,026✔
133
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
44,026!
134
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
8,342✔
135
  buf = rpcMallocCont(dataEncodeSize);
8,337✔
136
  STREAM_CHECK_NULL_GOTO(buf, terrno);
8,343!
137
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
8,343✔
138
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
8,345!
139
  *data = buf;
8,345✔
140
  *size = dataEncodeSize;
8,345✔
141
  buf = NULL;
8,345✔
142
end:
44,029✔
143
  rpcFreeCont(buf);
44,029✔
144
  return code;
44,028✔
145
}
146

147
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
553✔
148
  int32_t        pNum = 1;
553✔
149
  STableKeyInfo* pList = NULL;
553✔
150
  int32_t        code = 0;
553✔
151
  int32_t        lino = 0;
553✔
152
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
553!
153
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
552!
154

155
  cleanupQueryTableDataCond(&pTask->cond);
553✔
156
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
553!
157
                                                      pTask->options.twindows, pTask->options.suid, pTask->options.ver));
158
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
553!
159

160
end:
553✔
161
  STREAM_PRINT_LOG_END(code, lino);
553!
162
  return code;
553✔
163
}
164

165
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t gid, bool isVTable, int64_t uid,
3,432✔
166
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
167
  int32_t code = 0;
3,432✔
168
  int32_t lino = 0;
3,432✔
169
  int32_t index = 0;
3,432✔
170
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
3,432!
171
  if (!isVTable) {
3,408✔
172
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &gid));
2,727!
173
  }
174
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
3,382!
175
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
3,343!
176
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
3,308!
177
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
3,314!
178
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
3,315!
179

180
end:
3,329✔
181
  STREAM_PRINT_LOG_END(code, lino)
3,329!
182
  return code;
3,329✔
183
}
184

185
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
3,406✔
186
  pTSchema->numOfCols = 1;
3,406✔
187
  pTSchema->version = ver;
3,406✔
188
  pTSchema->columns[0].colId = colId;
3,406✔
189
  pTSchema->columns[0].type = type;
3,406✔
190
  pTSchema->columns[0].bytes = bytes;
3,406✔
191
}
3,406✔
192

193
int32_t retrieveWalMetaData(SSubmitTbData* pSubmitTbData, void* pTableList, bool isVTable, SSDataBlock* pBlock,
41,318✔
194
                            int64_t ver) {
195
  int32_t code = 0;
41,318✔
196
  int32_t lino = 0;
41,318✔
197

198
  int64_t   uid = pSubmitTbData->uid;
41,318✔
199
  int32_t   numOfRows = 0;
41,318✔
200
  int64_t   skey = 0;
41,318✔
201
  int64_t   ekey = 0;
41,318✔
202
  STSchema* pTSchema = NULL;
41,318✔
203
  uint64_t  gid = 0;
41,318✔
204
  if (isVTable) {
41,318✔
205
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS);
1,981✔
206
  } else {
207
    STREAM_CHECK_CONDITION_GOTO(!qStreamUidInTableList(pTableList, uid), TDB_CODE_SUCCESS);
39,337✔
208
    gid = qStreamGetGroupId(pTableList, uid);
2,670✔
209
  }
210

211
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
3,375!
UNCOV
212
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
UNCOV
213
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
UNCOV
214
    numOfRows = pCol->nVal;
×
215

UNCOV
216
    SColVal colVal = {0};
×
UNCOV
217
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, 0, &colVal));
×
UNCOV
218
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
219

UNCOV
220
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, numOfRows - 1, &colVal));
×
UNCOV
221
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
222
  } else {
223
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
3,375✔
224
    SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, 0);
3,367✔
225
    STREAM_CHECK_NULL_GOTO(pRow, terrno);
3,346!
226
    SColVal colVal = {0};
3,346✔
227
    pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn));
3,346!
228
    STREAM_CHECK_NULL_GOTO(pTSchema, terrno);
3,419!
229
    buildTSchema(pTSchema, pSubmitTbData->sver, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES);
3,419✔
230
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
3,392!
231
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
3,370✔
232
    pRow = taosArrayGetP(pSubmitTbData->aRowP, numOfRows - 1);
3,370✔
233
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
3,359!
234
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
3,360✔
235
  }
236

237
  STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_SUBMIT_DATA, gid, isVTable, uid, skey, ekey, ver, numOfRows));
3,360!
238
  pBlock->info.rows++;
3,265✔
239
  stDebug("stream reader scan submit data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64 ", gid %" PRIu64
3,265✔
240
          ", rows:%d",
241
          uid, skey, ekey, gid, numOfRows);
242

243
end:
2,400✔
244
  taosMemoryFree(pTSchema);
41,400!
245
  STREAM_PRINT_LOG_END(code, lino)
41,541!
246
  return code;
41,536✔
247
}
248

249
int32_t retrieveWalData(SVnode* pVnode, SSubmitTbData* pSubmitTbData, SSDataBlock* pBlock, STimeWindow* window) {
5,804✔
250
  stDebug("stream reader retrieve data block %p", pSubmitTbData);
5,804✔
251
  int32_t code = 0;
5,804✔
252
  int32_t lino = 0;
5,804✔
253

254
  int32_t ver = pSubmitTbData->sver;
5,804✔
255
  int64_t uid = pSubmitTbData->uid;
5,804✔
256
  int32_t numOfRows = 0;
5,804✔
257

258
  STSchema* schemas = metaGetTbTSchema(pVnode->pMeta, uid, ver, 1);
5,804✔
259
  STREAM_CHECK_NULL_GOTO(schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
5,819!
260
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
5,819!
UNCOV
261
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
UNCOV
262
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
UNCOV
263
    int32_t rowStart = 0;
×
UNCOV
264
    int32_t rowEnd = pCol->nVal;
×
UNCOV
265
    if (window != NULL) {
×
UNCOV
266
      SColVal colVal = {0};
×
UNCOV
267
      rowStart = -1;
×
UNCOV
268
      rowEnd = -1;
×
UNCOV
269
      for (int32_t k = 0; k < pCol->nVal; k++) {
×
UNCOV
270
        STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
UNCOV
271
        int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
UNCOV
272
        if (ts >= window->skey && rowStart == -1) {
×
UNCOV
273
          rowStart = k;
×
274
        }
UNCOV
275
        if (ts > window->ekey && rowEnd == -1) {
×
276
          rowEnd = k;
×
277
        }
278
      }
UNCOV
279
      STREAM_CHECK_CONDITION_GOTO(rowStart == -1 || rowStart == rowEnd, TDB_CODE_SUCCESS);
×
280

UNCOV
281
      if (rowStart != -1 && rowEnd == -1) {
×
UNCOV
282
        rowEnd = pCol->nVal;
×
283
      }
284
    }
UNCOV
285
    numOfRows = rowEnd - rowStart;
×
UNCOV
286
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
×
UNCOV
287
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
UNCOV
288
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
UNCOV
289
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
UNCOV
290
      if (i >= taosArrayGetSize(pSubmitTbData->aCol)) {
×
291
        break;
×
292
      }
UNCOV
293
      for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aCol); j++) {
×
UNCOV
294
        SColData* pCol = taosArrayGet(pSubmitTbData->aCol, j);
×
UNCOV
295
        STREAM_CHECK_NULL_GOTO(pCol, terrno);
×
UNCOV
296
        if (pCol->cid == pColData->info.colId) {
×
UNCOV
297
          for (int32_t k = rowStart; k < rowEnd; k++) {
×
UNCOV
298
            SColVal colVal = {0};
×
UNCOV
299
            STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
UNCOV
300
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
301
                                                !COL_VAL_IS_VALUE(&colVal)));
302
          }
303
        }
304
      }
305
    }
306
  } else {
307
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, taosArrayGetSize(pSubmitTbData->aRowP)));
5,819!
308
    for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aRowP); j++) {
516,190✔
309
      SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, j);
510,363✔
310
      STREAM_CHECK_NULL_GOTO(pRow, terrno);
510,366!
311
      SColVal colVal = {0};
510,366✔
312
      STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, 0, &colVal));
510,366!
313
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
510,364✔
314
      if (window != NULL && (ts < window->skey || ts > window->ekey)) {
510,364!
315
        continue;
41✔
316
      }
317
      for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
1,532,582✔
318
        if (i >= schemas->numOfCols) {
1,531,509✔
319
          break;
509,245✔
320
        }
321
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
1,022,264✔
322
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
1,022,264!
323
        SColVal colVal = {0};
1,022,268✔
324
        int32_t sourceIdx = 0;
1,022,268✔
325
        while (1) {
326
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, sourceIdx, &colVal));
1,536,028!
327
          if (colVal.cid < pColData->info.colId) {
1,536,009✔
328
            sourceIdx++;
513,760✔
329
            continue;
513,760✔
330
          } else {
331
            break;
1,022,249✔
332
          }
333
        }
334
        if (colVal.cid == pColData->info.colId && COL_VAL_IS_VALUE(&colVal)) {
1,022,249!
335
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
1,022,273!
336
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, numOfRows, colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
23!
337
          } else {
338
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
1,022,250!
339
          }
340
        } else {
UNCOV
341
          colDataSetNULL(pColData, numOfRows);
×
342
        }
343
      }
344
      numOfRows++;
510,328✔
345
    }
346
  }
347

348
  pBlock->info.rows = numOfRows;
5,822✔
349

350
end:
5,822✔
351
  taosMemoryFree(schemas);
5,822!
352
  STREAM_PRINT_LOG_END(code, lino)
5,821!
353
  return code;
5,820✔
354
}
355

356
static int32_t buildDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, SDeleteRes* req, int64_t uid, int64_t ver){
127✔
357
  int32_t    code = 0;
127✔
358
  int32_t    lino = 0;
127✔
359
  uint64_t   gid = 0;
127✔
360
  stDebug("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
127✔
361
  if (isVTable) {
127✔
362
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS)
29✔
363
  } else {
364
    gid = qStreamGetGroupId(pTableList, uid);
98✔
365
    STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
98✔
366
  }
367
  STREAM_CHECK_RET_GOTO(
64!
368
      buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, uid, req->skey, req->ekey, ver, 1));
369
  pBlock->info.rows++;
64✔
370

371
  stDebug("stream reader scan delete end data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
64✔
372
end:
48✔
373
  return code;
127✔
374
}
375

376
static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
127✔
377
                              int64_t ver) {
378
  int32_t    code = 0;
127✔
379
  int32_t    lino = 0;
127✔
380
  SDecoder   decoder = {0};
127✔
381
  SDeleteRes req = {0};
127✔
382
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
127✔
383
  tDecoderInit(&decoder, data, len);
127✔
384
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
127!
385
  
386
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
254✔
387
    uint64_t* uid = taosArrayGet(req.uidList, i);
127✔
388
    STREAM_CHECK_NULL_GOTO(uid, terrno);
127!
389
    STREAM_CHECK_RET_GOTO(buildDeleteData(pTableList, isVTable, pBlock, &req, *uid, ver));
127!
390
  }
391

392
end:
127✔
393
  taosArrayDestroy(req.uidList);
127✔
394
  tDecoderClear(&decoder);
127✔
395
  return code;
127✔
396
}
397

398
static int32_t scanDropTable(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
×
399
                             int64_t ver) {
400
  int32_t  code = 0;
×
401
  int32_t  lino = 0;
×
402
  SDecoder decoder = {0};
×
403

404
  SVDropTbBatchReq req = {0};
×
405
  tDecoderInit(&decoder, data, len);
×
406
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
×
407
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
408
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
409
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
×
410
    uint64_t gid = 0;
×
411
    if (isVTable) {
×
412
      if (taosHashGet(pTableList, &pDropTbReq->uid, sizeof(pDropTbReq->uid)) == NULL) {
×
413
        continue;
×
414
      }
415
    } else {
416
      gid = qStreamGetGroupId(pTableList, pDropTbReq->uid);
×
417
      if (gid == -1) continue;
×
418
    }
419

420
    STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_DELETE_TABLE, gid, isVTable, pDropTbReq->uid, 0, 0, ver, 1));
×
421
    pBlock->info.rows++;
×
422
    stDebug("stream reader scan drop :uid %" PRIu64 ", gid %" PRIu64, pDropTbReq->uid, gid);
×
423
  }
424

425
end:
×
426
  tDecoderClear(&decoder);
×
427
  return code;
×
428
}
429

430
static int32_t scanSubmitData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
41,486✔
431
                              int64_t ver) {
432
  int32_t  code = 0;
41,486✔
433
  int32_t  lino = 0;
41,486✔
434
  SDecoder decoder = {0};
41,486✔
435

436
  SSubmitReq2 submit = {0};
41,486✔
437
  tDecoderInit(&decoder, data, len);
41,486✔
438
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
41,451!
439

440
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
41,362✔
441

442
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfBlocks));
41,359!
443

444
  int32_t nextBlk = -1;
41,339✔
445
  while (++nextBlk < numOfBlocks) {
82,867✔
446
    stDebug("stream reader scan submit, next data block %d/%d", nextBlk, numOfBlocks);
41,337✔
447
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
41,337✔
448
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
41,322!
449
    STREAM_CHECK_RET_GOTO(retrieveWalMetaData(pSubmitTbData, pTableList, isVTable, pBlock, ver));
41,322!
450
  }
451
end:
41,530✔
452
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
41,530✔
453
  tDecoderClear(&decoder);
41,489✔
454
  return code;
41,545✔
455
}
456

457
static int32_t scanWal(SVnode* pVnode, void* pTableList, bool isVTable, SSDataBlock* pBlock, int64_t lastVer,
8,055✔
458
                       int8_t deleteData, int8_t deleteTb, int64_t ctime, int64_t* retVer) {
459
  int32_t code = 0;
8,055✔
460
  int32_t lino = 0;
8,055✔
461

462
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
8,055✔
463
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
8,050!
464
  *retVer = walGetAppliedVer(pWalReader->pWal);
8,050✔
465
  STREAM_CHECK_CONDITION_GOTO(walReaderSeekVer(pWalReader, lastVer + 1) != 0, TSDB_CODE_SUCCESS);
8,053✔
466

467
  while (1) {
41,794✔
468
    *retVer = walGetAppliedVer(pWalReader->pWal);
42,995✔
469
    STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader, true) < 0, TSDB_CODE_SUCCESS);
42,985✔
470

471
    SWalCont* wCont = &pWalReader->pHead->head;
41,786✔
472
    if (wCont->ingestTs / 1000 > ctime) break;
41,786!
473
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
41,786✔
474
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
41,786✔
475
    int64_t ver = wCont->version;
41,786✔
476

477
    stDebug("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
41,786✔
478
      TD_VID(pVnode), ver, wCont->msgType, deleteData, deleteTb);
479
    if (wCont->msgType == TDMT_VND_DELETE && deleteData != 0) {
41,780✔
480
      STREAM_CHECK_RET_GOTO(scanDeleteData(pTableList, isVTable, pBlock, data, len, ver));
127!
481
    } else if (wCont->msgType == TDMT_VND_DROP_TABLE && deleteTb != 0) {
41,653!
482
      STREAM_CHECK_RET_GOTO(scanDropTable(pTableList, isVTable, pBlock, data, len, ver));
×
483
    } else if (wCont->msgType == TDMT_VND_SUBMIT) {
41,653✔
484
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
41,507✔
485
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
41,507✔
486
      STREAM_CHECK_RET_GOTO(scanSubmitData(pTableList, isVTable, pBlock, data, len, ver));
41,507!
487
    }
488

489
    if (pBlock->info.rows >= STREAM_RETURN_ROWS_NUM) {
41,794!
490
      break;
×
491
    }
492
  }
493

494
end:
8,050✔
495
  walCloseReader(pWalReader);
8,050✔
496
  STREAM_PRINT_LOG_END(code, lino);
8,050!
497
  return code;
8,052✔
498
}
499

500
int32_t scanWalOneVer(SVnode* pVnode, SSDataBlock* pBlock, SSDataBlock* pBlockRet,
5,821✔
501
                      int64_t ver, int64_t uid, STimeWindow* window) {
502
  int32_t     code = 0;
5,821✔
503
  int32_t     lino = 0;
5,821✔
504
  SSubmitReq2 submit = {0};
5,821✔
505
  SDecoder    decoder = {0};
5,821✔
506

507
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
5,821✔
508
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
5,820!
509

510
  STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, ver));
5,820!
511
  STREAM_CHECK_CONDITION_GOTO(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT, TSDB_CODE_STREAM_WAL_VER_NOT_DATA);
5,816!
512
  STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
5,816!
513
  void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
5,815✔
514
  int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
5,815✔
515

516
  int32_t nextBlk = -1;
5,815✔
517
  tDecoderInit(&decoder, pBody, bodyLen);
5,815✔
518
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
5,811!
519

520
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
5,812✔
521
  while (++nextBlk < numOfBlocks) {
11,623✔
522
    stDebug("stream reader next data block %d/%d", nextBlk, numOfBlocks);
5,805✔
523
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
5,805✔
524
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
5,805!
525
    if (pSubmitTbData->uid != uid) {
5,807!
526
      stDebug("stream reader skip data block uid:%" PRId64, pSubmitTbData->uid);
×
527
      continue;
×
528
    }
529
    STREAM_CHECK_RET_GOTO(retrieveWalData(pVnode, pSubmitTbData, pBlock, window));
5,807!
530
    printDataBlock(pBlock, __func__, "");
5,819✔
531

532
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRet, pBlock));
5,814!
533
    blockDataCleanup(pBlock);
5,814✔
534
  }
535

536
end:
5,818✔
537
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
5,818✔
538
  walCloseReader(pWalReader);
5,815✔
539
  tDecoderClear(&decoder);
5,824✔
540
  STREAM_PRINT_LOG_END(code, lino);
5,823!
541
  return code;
5,821✔
542
}
543

544
static int32_t processTag(SVnode* pVnode, SExprInfo* pExpr, int32_t numOfExpr, SStorageAPI* api, SSDataBlock* pBlock) {
4,944✔
545
  int32_t     code = 0;
4,944✔
546
  int32_t     lino = 0;
4,944✔
547
  SMetaReader mr = {0};
4,944✔
548

549
  if (numOfExpr == 0) {
4,944!
550
    return TSDB_CODE_SUCCESS;
×
551
  }
552
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
4,944✔
553
  code = api->metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
4,943✔
554
  if (code != TSDB_CODE_SUCCESS) {
4,933!
555
    stError("failed to get table meta, uid:%" PRId64 ", code:%s", pBlock->info.id.uid, tstrerror(code));
×
556
  }
557
  api->metaReaderFn.readerReleaseLock(&mr);
4,933✔
558
  for (int32_t j = 0; j < numOfExpr; ++j) {
21,296✔
559
    const SExprInfo* pExpr1 = &pExpr[j];
16,363✔
560
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
16,363✔
561

562
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
16,363✔
563
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
16,358!
564
    if (mr.me.name == NULL) {
16,360!
565
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
566
      continue;
×
567
    }
568
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
16,360✔
569

570
    int32_t functionId = pExpr1->pExpr->_function.functionId;
16,357✔
571

572
    // this is to handle the tbname
573
    if (fmIsScanPseudoColumnFunc(functionId)) {
16,357✔
574
      int32_t fType = pExpr1->pExpr->_function.functionType;
4,937✔
575
      if (fType == FUNCTION_TYPE_TBNAME) {
4,937!
576
        STREAM_CHECK_RET_GOTO(setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name));
4,940!
577
        pColInfoData->info.colId = -1;
4,925✔
578
      }
579
    } else {  // these are tags
580
      STagVal tagVal = {0};
11,426✔
581
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
11,426✔
582
      const char* p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
11,426✔
583

584
      char* data = NULL;
11,418✔
585
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
11,418!
586
        data = tTagValToData((const STagVal*)p, false);
11,425✔
587
      } else {
588
        data = (char*)p;
×
589
      }
590

591
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
11,427!
592
      if (isNullVal) {
11,427!
593
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
594
      } else {
595
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
11,427✔
596
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
11,426!
597
          taosMemoryFree(data);
7,091!
598
        }
599
        STREAM_CHECK_RET_GOTO(code);
11,433!
600
      }
601
    }
602
  }
603

604
end:
4,933✔
605
  api->metaReaderFn.clearReader(&mr);
4,933✔
606

607
  STREAM_PRINT_LOG_END(code, lino);
4,939!
608
  return code;
4,935✔
609
}
610

611
static int32_t processWalVerData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamInfo, int64_t ver, bool isTrigger,
4,735✔
612
                                 int64_t uid, STimeWindow* window, SSDataBlock* pSrcBlock, SSDataBlock** pBlock) {
613
  int32_t      code = 0;
4,735✔
614
  int32_t      lino = 0;
4,735✔
615
  SFilterInfo* pFilterInfo = NULL;
4,735✔
616
  SSDataBlock* pBlock1 = NULL;
4,735✔
617
  SSDataBlock* pBlock2 = NULL;
4,735✔
618

619
  SExprInfo*   pExpr = sStreamInfo->pExprInfo;
4,735✔
620
  int32_t      numOfExpr = sStreamInfo->numOfExpr;
4,735✔
621

622
  STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamInfo->pConditions, &pFilterInfo, 0, NULL));
4,735!
623

624
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock1));
4,729!
625
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock2));
4,733!
626
  if (!isTrigger) STREAM_CHECK_RET_GOTO(createOneDataBlock(pSrcBlock, false, pBlock));
4,735!
627

628
  pBlock2->info.id.uid = uid;
4,735✔
629
  pBlock1->info.id.uid = uid;
4,735✔
630

631
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
4,735!
632

633
  if (pBlock2->info.rows > 0) {
4,734!
634
    SStorageAPI  api = {0};
4,735✔
635
    initStorageAPI(&api);
4,735✔
636
    STREAM_CHECK_RET_GOTO(processTag(pVnode, pExpr, numOfExpr, &api, pBlock2));
4,735!
637
  }
638
  STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock2, pFilterInfo));
4,724!
639
  if (!isTrigger) {
4,733✔
640
    blockDataTransform(*pBlock, pBlock2);
2,783✔
641
  } else {
642
    *pBlock = pBlock2;
1,950✔
643
    pBlock2 = NULL;  
1,950✔
644
  }
645

646
  printDataBlock(*pBlock, __func__, "processWalVerData2");
4,733✔
647

648
end:
4,732✔
649
  STREAM_PRINT_LOG_END(code, lino);
4,732!
650
  filterFreeInfo(pFilterInfo);
4,732✔
651
  blockDataDestroy(pBlock1);
4,733✔
652
  blockDataDestroy(pBlock2);
4,737✔
653
  return code;
4,735✔
654
}
655

656
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
1,384✔
657
  int32_t code = 0;
1,384✔
658
  int32_t lino = 0;
1,384✔
659
  SMetaReader metaReader = {0};
1,384✔
660
  SStorageAPI api = {0};
1,384✔
661
  initStorageAPI(&api);
1,384✔
662
  *schemas = taosArrayInit(8, sizeof(SSchema));
1,384✔
663
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
1,385!
664
  
665
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
1,385✔
666
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
1,384!
667

668
  SSchemaWrapper* sSchemaWrapper = NULL;
1,381✔
669
  if (metaReader.me.type == TD_CHILD_TABLE) {
1,381!
670
    int64_t suid = metaReader.me.ctbEntry.suid;
1,381✔
671
    tDecoderClear(&metaReader.coder);
1,381✔
672
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
1,385!
673
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
1,381✔
UNCOV
674
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
×
UNCOV
675
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
×
676
  } else {
677
    qError("invalid table type:%d", metaReader.me.type);
×
678
  }
679

680
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
4,277✔
681
    SSchema* s = sSchemaWrapper->pSchema + j;
2,897✔
682
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
5,792!
683
  }
684

685
end:
1,380✔
686
  api.metaReaderFn.clearReader(&metaReader);
1,380✔
687
  STREAM_PRINT_LOG_END(code, lino);
1,385!
688
  if (code != 0)  taosArrayDestroy(*schemas);
1,385!
689
  return code;
1,385✔
690
}
691

692
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
1,384✔
693
  int32_t code = 0;
1,384✔
694
  int32_t lino = 0;
1,384✔
695
  for (size_t i = 0; i < taosArrayGetSize(schemas); i++) {
4,281✔
696
    SSchema* s = taosArrayGet(schemas, i);
2,899✔
697
    STREAM_CHECK_NULL_GOTO(s, terrno);
2,896!
698

699
    size_t j = 0;
2,896✔
700
    for (; j < taosArrayGetSize(cols); j++) {
4,547✔
701
      col_id_t* id = taosArrayGet(cols, j);
4,507✔
702
      STREAM_CHECK_NULL_GOTO(id, terrno);
4,507!
703
      if (*id == s->colId) {
4,508✔
704
        break;
2,857✔
705
      }
706
    }
707
    if (j == taosArrayGetSize(cols)) {
2,896✔
708
      // not found, remove it
709
      taosArrayRemove(schemas, i);
39✔
710
      i--;
39✔
711
    }
712
  }
713

714
end:
1,381✔
715
  return code;
1,381✔
716
}
717

718
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
1,086✔
719
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
720
  int32_t      code = 0;
1,086✔
721
  int32_t      lino = 0;
1,086✔
722
  SArray*      schemas = NULL;
1,086✔
723

724
  SSDataBlock* pBlock1 = NULL;
1,086✔
725
  SSDataBlock* pBlock2 = NULL;
1,086✔
726

727
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
1,086!
728
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
1,086!
729
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock1));
1,084!
730
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
1,085!
731

732
  pBlock2->info.id.uid = uid;
1,087✔
733
  pBlock1->info.id.uid = uid;
1,087✔
734

735
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
1,087!
736
  printDataBlock(pBlock2, __func__, "");
1,087✔
737

738
  *pBlock = pBlock2;
1,086✔
739
  pBlock2 = NULL;
1,086✔
740

741
end:
1,086✔
742
  STREAM_PRINT_LOG_END(code, lino);
1,086!
743
  blockDataDestroy(pBlock1);
1,086✔
744
  blockDataDestroy(pBlock2);
1,086✔
745
  taosArrayDestroy(schemas);
1,086✔
746
  return code;
1,085✔
747
}
748

749
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
298✔
750
                                    STargetNode* pTargetNodeTs) {
751
  int32_t code = 0;
298✔
752
  int32_t lino = 0;
298✔
753

754
  SColumnNode*         pCol = NULL;
298✔
755
  SColumnNode*         pCol1 = NULL;
298✔
756
  SValueNode*          pVal = NULL;
298✔
757
  SValueNode*          pVal1 = NULL;
298✔
758
  SOperatorNode*       op = NULL;
298✔
759
  SOperatorNode*       op1 = NULL;
298✔
760
  SLogicConditionNode* cond = NULL;
298✔
761

762
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
298!
763
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
298✔
764
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
298✔
765
  pCol->node.resType.bytes = LONG_BYTES;
298✔
766
  pCol->slotId = pTargetNodeTs->slotId;
298✔
767
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
298✔
768

769
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
298!
770

771
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
298!
772
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
298✔
773
  pVal->node.resType.bytes = LONG_BYTES;
298✔
774
  pVal->datum.i = start;
298✔
775
  pVal->typeData = start;
298✔
776

777
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
298!
778
  pVal1->datum.i = end;
298✔
779
  pVal1->typeData = end;
298✔
780

781
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
298!
782
  op->opType = OP_TYPE_GREATER_EQUAL;
298✔
783
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
298✔
784
  op->node.resType.bytes = CHAR_BYTES;
298✔
785
  op->pLeft = (SNode*)pCol;
298✔
786
  op->pRight = (SNode*)pVal;
298✔
787
  pCol = NULL;
298✔
788
  pVal = NULL;
298✔
789

790
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
298!
791
  op1->opType = OP_TYPE_LOWER_EQUAL;
298✔
792
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
298✔
793
  op1->node.resType.bytes = CHAR_BYTES;
298✔
794
  op1->pLeft = (SNode*)pCol1;
298✔
795
  op1->pRight = (SNode*)pVal1;
298✔
796
  pCol1 = NULL;
298✔
797
  pVal1 = NULL;
298✔
798

799
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
298!
800
  cond->condType = LOGIC_COND_TYPE_AND;
298✔
801
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
298✔
802
  cond->node.resType.bytes = CHAR_BYTES;
298✔
803
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
298!
804
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
298!
805
  op = NULL;
298✔
806
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
298!
807
  op1 = NULL;
298✔
808

809
  *pCond = cond;
298✔
810

811
end:
298✔
812
  if (code != 0) {
298!
813
    nodesDestroyNode((SNode*)pCol);
×
814
    nodesDestroyNode((SNode*)pCol1);
×
815
    nodesDestroyNode((SNode*)pVal);
×
816
    nodesDestroyNode((SNode*)pVal1);
×
817
    nodesDestroyNode((SNode*)op);
×
818
    nodesDestroyNode((SNode*)op1);
×
819
    nodesDestroyNode((SNode*)cond);
×
820
  }
821
  STREAM_PRINT_LOG_END(code, lino);
298!
822

823
  return code;
298✔
824
}
825

826
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
98✔
827
  int32_t              code = 0;
98✔
828
  int32_t              lino = 0;
98✔
829
  SLogicConditionNode* pAndCondition = NULL;
98✔
830
  SLogicConditionNode* cond = NULL;
98✔
831

832
  if (pTargetNodeTs == NULL) {
98!
833
    vError("stream reader %s no ts column", __func__);
×
834
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
×
835
  }
836
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
98!
837
  cond->condType = LOGIC_COND_TYPE_OR;
98✔
838
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
98✔
839
  cond->node.resType.bytes = CHAR_BYTES;
98✔
840
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
98!
841

842
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
396✔
843
    data->curIdx = i;
298✔
844

845
    SReadHandle handle = {0};
298✔
846
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
298✔
847
    if (!handle.winRangeValid) {
298!
848
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
×
849
              handle.winRange.ekey);
850
      continue;
×
851
    }
852
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
298!
853
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
298✔
854
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
298!
855
    pAndCondition = NULL;
298✔
856
  }
857

858
  *pCond = cond;
98✔
859

860
end:
98✔
861
  if (code != 0) {
98!
862
    nodesDestroyNode((SNode*)pAndCondition);
×
863
    nodesDestroyNode((SNode*)cond);
×
864
  }
865
  STREAM_PRINT_LOG_END(code, lino);
98!
866

867
  return code;
98✔
868
}
869

870
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
14,461✔
871
                                    STimeRangeNode* node, SReadHandle* handle) {
872
  int32_t code = 0;
14,461✔
873
  int32_t lino = 0;
14,461✔
874
  SArray* funcVals = NULL;
14,461✔
875
  if (req->pStRtFuncInfo->withExternalWindow) {
14,461✔
876
    nodesDestroyNode(sStreamReaderCalcInfo->tsConditions);
98✔
877
    filterFreeInfo(sStreamReaderCalcInfo->pFilterInfo);
98✔
878
    sStreamReaderCalcInfo->pFilterInfo = NULL;
98✔
879

880
    STREAM_CHECK_RET_GOTO(createExternalConditions(req->pStRtFuncInfo,
98!
881
                                                   (SLogicConditionNode**)&sStreamReaderCalcInfo->tsConditions,
882
                                                   sStreamReaderCalcInfo->pTargetNodeTs, node));
883

884
    STREAM_CHECK_RET_GOTO(filterInitFromNode((SNode*)sStreamReaderCalcInfo->tsConditions,
98!
885
                                             (SFilterInfo**)&sStreamReaderCalcInfo->pFilterInfo,
886
                                             FLT_OPTION_NO_REWRITE | FLT_OPTION_SCALAR_MODE, NULL));
887
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
98✔
888
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
98✔
889
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
98!
890
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
98!
891

892
    handle->winRange.skey = pFirst->wstart;
98✔
893
    handle->winRange.ekey = pLast->wend;
98✔
894
    handle->winRangeValid = true;
98✔
895
    stDebug("%s withExternalWindow is true, skey:%" PRId64 ", ekey:%" PRId64, __func__, pFirst->wstart, pLast->wend);
98✔
896
  } else {
897
    calcTimeRange(node, req->pStRtFuncInfo, &handle->winRange, &handle->winRangeValid);
14,363✔
898
  }
899

900
end:
14,461✔
901
  taosArrayDestroy(funcVals);
14,461✔
902
  return code;
14,461✔
903
}
904

905
static int32_t createBlockForWalMeta(SSDataBlock** pBlock, bool isVTable) {
8,030✔
906
  int32_t code = 0;
8,030✔
907
  int32_t lino = 0;
8,030✔
908
  SArray* schemas = NULL;
8,030✔
909

910
  schemas = taosArrayInit(8, sizeof(SSchema));
8,030✔
911
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
8,048!
912

913
  int32_t index = 0;
8,048✔
914
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
8,048!
915
  if (!isVTable) {
8,058✔
916
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid
7,678!
917
  }
918
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
8,056!
919
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
8,051!
920
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
8,055!
921
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
8,047!
922
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // nrows
8,055!
923

924
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
8,050!
925

926
end:
8,062✔
927
  taosArrayDestroy(schemas);
8,062✔
928
  return code;
8,060✔
929
}
930

931
static int32_t createOptionsForLastTs(SStreamTriggerReaderTaskInnerOptions* options,
293✔
932
                                      SStreamTriggerReaderInfo*             sStreamReaderInfo) {
933
  int32_t code = 0;
293✔
934
  int32_t lino = 0;
293✔
935
  SArray* schemas = NULL;
293✔
936

937
  schemas = taosArrayInit(4, sizeof(SSchema));
293✔
938
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
293!
939
  STREAM_CHECK_RET_GOTO(
293!
940
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // last ts
941

942
  BUILD_OPTION(op, sStreamReaderInfo, -1, true, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, schemas, true,
294✔
943
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
944
  schemas = NULL;
294✔
945
  *options = op;
294✔
946

947
end:
294✔
948
  taosArrayDestroy(schemas);
294✔
949
  return code;
293✔
950
}
951

952
static int32_t createOptionsForFirstTs(SStreamTriggerReaderTaskInnerOptions* options,
327✔
953
                                       SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t ver) {
954
  int32_t code = 0;
327✔
955
  int32_t lino = 0;
327✔
956
  SArray* schemas = NULL;
327✔
957

958
  schemas = taosArrayInit(4, sizeof(SSchema));
327✔
959
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
327!
960
  STREAM_CHECK_RET_GOTO(
327!
961
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // first ts
962

963
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, TSDB_ORDER_ASC, start, INT64_MAX, schemas, true,
327✔
964
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
965
  schemas = NULL;
327✔
966

967
  *options = op;
327✔
968
end:
327✔
969
  taosArrayDestroy(schemas);
327✔
970
  return code;
327✔
971
}
972

973
static int32_t createOptionsForTsdbMeta(SStreamTriggerReaderTaskInnerOptions* options,
1,340✔
974
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t end,
975
                                        int64_t gid, int8_t order, int64_t ver, bool onlyTs) {
976
  int32_t code = 0;
1,340✔
977
  int32_t lino = 0;
1,340✔
978
  SArray* schemas = NULL;
1,340✔
979

980
  int32_t index = 1;
1,340✔
981
  schemas = taosArrayInit(8, sizeof(SSchema));
1,340✔
982
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,340!
983
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
1,340!
984
  if (!onlyTs){
1,340✔
985
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
670!
986
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
670!
987
    if (sStreamReaderInfo->uidList == NULL) {
670✔
988
      STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
130!
989
    }
990
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
670!
991
  }
992
  
993
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, start, end, schemas, true, (gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), gid,
1,340✔
994
               true, sStreamReaderInfo->uidList);
995
  schemas = NULL;
1,340✔
996
  *options = op;
1,340✔
997

998
end:
1,340✔
999
  taosArrayDestroy(schemas);
1,340✔
1000
  return code;
1,340✔
1001
}
1002

1003
static int taosCompareInt64Asc(const void* elem1, const void* elem2) {
41✔
1004
  int64_t* node1 = (int64_t*)elem1;
41✔
1005
  int64_t* node2 = (int64_t*)elem2;
41✔
1006

1007
  if (*node1 < *node2) {
41!
1008
    return -1;
×
1009
  }
1010

1011
  return *node1 > *node2;
41✔
1012
}
1013

1014
static int32_t getTableList(SArray** pList, int32_t* pNum, int64_t* suid, int32_t index,
55✔
1015
                            SStreamTriggerReaderInfo* sStreamReaderInfo) {
1016
  int32_t code = 0;
55✔
1017
  int32_t lino = 0;
55✔
1018

1019
  int32_t* start = taosArrayGet(sStreamReaderInfo->uidListIndex, index);
55✔
1020
  STREAM_CHECK_NULL_GOTO(start, terrno);
55!
1021
  int32_t  iStart = *start;
55✔
1022
  int32_t* end = taosArrayGet(sStreamReaderInfo->uidListIndex, index + 1);
55✔
1023
  STREAM_CHECK_NULL_GOTO(end, terrno);
55!
1024
  int32_t iEnd = *end;
55✔
1025
  *pList = taosArrayInit(iEnd - iStart, sizeof(STableKeyInfo));
55✔
1026
  STREAM_CHECK_NULL_GOTO(*pList, terrno);
55!
1027
  for (int32_t i = iStart; i < iEnd; ++i) {
188✔
1028
    int64_t*       uid = taosArrayGet(sStreamReaderInfo->uidList, i);
133✔
1029
    STableKeyInfo* info = taosArrayReserve(*pList, 1);
133✔
1030
    STREAM_CHECK_NULL_GOTO(info, terrno);
133!
1031
    *suid = uid[0];
133✔
1032
    info->uid = uid[1];
133✔
1033
  }
1034
  *pNum = iEnd - iStart;
55✔
1035
end:
55✔
1036
  STREAM_PRINT_LOG_END(code, lino);
55!
1037
  return code;
55✔
1038
}
1039

1040
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
564✔
1041
                                  SStreamReaderTaskInner* pTask) {
1042
  int32_t code = 0;
564✔
1043
  int32_t lino = 0;
564✔
1044

1045
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTask->pTableList), sizeof(STsInfo));
564✔
1046
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
566!
1047
  while (true) {
553✔
1048
    bool hasNext = false;
1,119✔
1049
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
1,119!
1050
    if (hasNext) {
1,117✔
1051
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
664✔
1052
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
664✔
1053
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
664!
1054
      if (pTask->options.order == TSDB_ORDER_ASC) {
664✔
1055
        tsInfo->ts = pTask->pResBlock->info.window.skey;
544✔
1056
      } else {
1057
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
120✔
1058
      }
1059
      tsInfo->gId = qStreamGetGroupId(pTask->pTableList, pTask->pResBlock->info.id.uid);
664✔
1060
      stDebug("vgId:%d %s get last ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
665✔
1061
              tsInfo->gId, tsRsp->ver);
1062
    }
1063

1064
    pTask->currentGroupIndex++;
1,119✔
1065
    if (pTask->currentGroupIndex >= qStreamGetTableListGroupNum(pTask->pTableList)) {
1,119✔
1066
      break;
565✔
1067
    }
1068
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTask));
553!
1069
  }
1070

1071
end:
565✔
1072
  return code;
565✔
1073
}
1074

1075
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
55✔
1076
                               SStreamReaderTaskInner* pTask, int64_t ver) {
1077
  int32_t code = 0;
55✔
1078
  int32_t lino = 0;
55✔
1079
  int64_t suid = 0;
55✔
1080
  SArray* pList = NULL;
55✔
1081
  int32_t pNum = 0;
55✔
1082

1083
  tsRsp->tsInfo = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(STsInfo));
55✔
1084
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
55!
1085
  for (int32_t i = 0; i < taosArrayGetSize(sStreamReaderInfo->uidListIndex) - 1; ++i) {
110✔
1086
    STREAM_CHECK_RET_GOTO(getTableList(&pList, &pNum, &suid, i, sStreamReaderInfo));
55!
1087

1088
    cleanupQueryTableDataCond(&pTask->cond);
55✔
1089
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas,
55!
1090
                                                        pTask->options.isSchema, pTask->options.twindows, suid, ver));
1091
    STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderOpen(
55!
1092
        pVnode, &pTask->cond, taosArrayGet(pList, 0), pNum, pTask->pResBlock, (void**)&pTask->pReader, pTask->idStr, NULL));
1093
    taosArrayDestroy(pList);
55✔
1094
    pList = NULL;
55✔
1095
    while (true) {
91✔
1096
      bool hasNext = false;
146✔
1097
      STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
146!
1098
      if (!hasNext) {
146✔
1099
        break;
55✔
1100
      }
1101
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
91✔
1102
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
91✔
1103
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
91!
1104
      if (pTask->options.order == TSDB_ORDER_ASC) {
91✔
1105
        tsInfo->ts = pTask->pResBlock->info.window.skey;
73✔
1106
      } else {
1107
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
18✔
1108
      }
1109
      tsInfo->gId = pTask->pResBlock->info.id.uid;
91✔
1110
      stDebug("vgId:%d %s get vtable last ts:%" PRId64 ", uid:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__,
91✔
1111
              tsInfo->ts, tsInfo->gId, tsRsp->ver);
1112
    }
1113
    pTask->api.tsdReader.tsdReaderClose(pTask->pReader);
55✔
1114
    pTask->pReader = NULL;
55✔
1115
  }
1116

1117
end:
55✔
1118
  taosArrayDestroy(pList);
55✔
1119
  return code;
55✔
1120
}
1121

1122
static void reSetUid(SStreamTriggerReaderTaskInnerOptions* options, int64_t suid, int64_t uid) {
334✔
1123
  if (suid != 0) options->suid = suid;
334✔
1124
  options->uid = uid;
334✔
1125
  if (options->suid != 0) {
334✔
1126
    options->tableType = TD_CHILD_TABLE;
331✔
1127
  } else {
1128
    options->tableType = TD_NORMAL_TABLE;
3✔
1129
  }
1130
}
334✔
1131

1132
static int32_t createOptionsForTsdbData(SVnode* pVnode, SStreamTriggerReaderTaskInnerOptions* options,
298✔
1133
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid, SArray* cols,
1134
                                        int8_t order,int64_t skey, int64_t ekey, int64_t ver) {
1135
  int32_t code = 0;
298✔
1136
  int32_t lino = 0;
298✔
1137
  SArray* schemas = NULL;
298✔
1138

1139
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
298!
1140
  STREAM_CHECK_RET_GOTO(shrinkScheams(cols, schemas));
298!
1141
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, skey, ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
298✔
1142
  *options = op;
298✔
1143

1144
end:
298✔
1145
  return code;
298✔
1146
}
1147

1148
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
28✔
1149
  int32_t code = 0;
28✔
1150
  int32_t lino = 0;
28✔
1151
  void*   buf = NULL;
28✔
1152
  size_t  size = 0;
28✔
1153
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
28!
1154
  void* pTask = sStreamReaderInfo->pTask;
28✔
1155

1156
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
28✔
1157

1158
  TSWAP(sStreamReaderInfo->uidList, req->setTableReq.uids);
28✔
1159
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidList, TSDB_CODE_INVALID_PARA);
28!
1160

1161
  taosArraySort(sStreamReaderInfo->uidList, taosCompareInt64Asc);
28✔
1162
  if (sStreamReaderInfo->uidListIndex != NULL) {
28!
1163
    taosArrayClear(sStreamReaderInfo->uidListIndex);
×
1164
  } else {
1165
    sStreamReaderInfo->uidListIndex = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(int32_t));
28✔
1166
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidListIndex, TSDB_CODE_INVALID_PARA);
28!
1167
  }
1168

1169
  if (sStreamReaderInfo->uidHash != NULL) {
28!
1170
    taosHashClear(sStreamReaderInfo->uidHash);
×
1171
  } else {
1172
    sStreamReaderInfo->uidHash = taosHashInit(taosArrayGetSize(sStreamReaderInfo->uidList),
28✔
1173
                                              taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1174
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHash, TSDB_CODE_INVALID_PARA);
28!
1175
  }
1176

1177
  int64_t suid = 0;
28✔
1178
  int32_t cnt = taosArrayGetSize(sStreamReaderInfo->uidList);
28✔
1179
  for (int32_t i = 0; i < cnt; ++i) {
88✔
1180
    int64_t* data = taosArrayGet(sStreamReaderInfo->uidList, i);
60✔
1181
    STREAM_CHECK_NULL_GOTO(data, terrno);
60!
1182
    if (*data == 0 || *data != suid) {
60✔
1183
      STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &i), terrno);
56!
1184
    }
1185
    suid = *data;
60✔
1186
    stDebug("vgId:%d %s suid:%" PRId64 ",uid:%" PRId64 ", index:%d", TD_VID(pVnode), __func__, suid, data[1], i);
60✔
1187
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->uidHash, data + 1, LONG_BYTES, &i, sizeof(int32_t)));
60!
1188
  }
1189
  STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &cnt), terrno);
56!
1190

1191
end:
28✔
1192
  STREAM_PRINT_LOG_END_WITHID(code, lino);
28!
1193
  SRpcMsg rsp = {
28✔
1194
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1195
  tmsgSendRsp(&rsp);
28✔
1196
  return code;
28✔
1197
}
1198

1199
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
288✔
1200
  int32_t                 code = 0;
288✔
1201
  int32_t                 lino = 0;
288✔
1202
  SArray*                 schemas = NULL;
288✔
1203
  SStreamReaderTaskInner* pTaskInner = NULL;
288✔
1204
  SStreamTsResponse       lastTsRsp = {0};
288✔
1205
  void*                   buf = NULL;
288✔
1206
  size_t                  size = 0;
288✔
1207

1208
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
288!
1209
  void* pTask = sStreamReaderInfo->pTask;
288✔
1210

1211
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
288✔
1212

1213
  SStreamTriggerReaderTaskInnerOptions options = {0};
288✔
1214
  STREAM_CHECK_RET_GOTO(createOptionsForLastTs(&options, sStreamReaderInfo));
288!
1215
  SStorageAPI api = {0};
293✔
1216
  initStorageAPI(&api);
293✔
1217
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
294!
1218

1219
  lastTsRsp.ver = pVnode->state.applied;
293✔
1220
  if (sStreamReaderInfo->uidList != NULL) {
293✔
1221
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner, -1));
28!
1222
  } else {
1223
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
265!
1224
  }
1225
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
293✔
1226
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
293!
1227

1228
end:
294✔
1229
  STREAM_PRINT_LOG_END_WITHID(code, lino);
294!
1230
  SRpcMsg rsp = {
294✔
1231
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1232
  tmsgSendRsp(&rsp);
294✔
1233
  taosArrayDestroy(lastTsRsp.tsInfo);
293✔
1234
  releaseStreamTask(&pTaskInner);
294✔
1235
  return code;
293✔
1236
}
1237

1238
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
327✔
1239
  int32_t                 code = 0;
327✔
1240
  int32_t                 lino = 0;
327✔
1241
  SStreamReaderTaskInner* pTaskInner = NULL;
327✔
1242
  SStreamTsResponse       firstTsRsp = {0};
327✔
1243
  void*                   buf = NULL;
327✔
1244
  size_t                  size = 0;
327✔
1245

1246
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
327!
1247
  void* pTask = sStreamReaderInfo->pTask;
327✔
1248
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
327✔
1249
  SStreamTriggerReaderTaskInnerOptions options = {0};
327✔
1250
  STREAM_CHECK_RET_GOTO(createOptionsForFirstTs(&options, sStreamReaderInfo, req->firstTsReq.startTime, req->firstTsReq.ver));
327!
1251
  SStorageAPI api = {0};
327✔
1252
  initStorageAPI(&api);
327✔
1253
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
327!
1254
  
1255
  firstTsRsp.ver = pVnode->state.applied;
327✔
1256
  if (sStreamReaderInfo->uidList != NULL) {
327✔
1257
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.ver));
27!
1258
  } else {
1259
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
300!
1260
  }
1261

1262
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
327✔
1263
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
327!
1264

1265
end:
327✔
1266
  STREAM_PRINT_LOG_END_WITHID(code, lino);
327!
1267
  SRpcMsg rsp = {
327✔
1268
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1269
  tmsgSendRsp(&rsp);
327✔
1270
  taosArrayDestroy(firstTsRsp.tsInfo);
327✔
1271
  releaseStreamTask(&pTaskInner);
327✔
1272
  return code;
327✔
1273
}
1274

1275
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
670✔
1276
  int32_t code = 0;
670✔
1277
  int32_t lino = 0;
670✔
1278
  void*   buf = NULL;
670✔
1279
  size_t  size = 0;
670✔
1280
  SStreamTriggerReaderTaskInnerOptions options = {0};
670✔
1281

1282
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
670!
1283
  void* pTask = sStreamReaderInfo->pTask;
670✔
1284
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
670✔
1285

1286
  SStreamReaderTaskInner* pTaskInner = NULL;
670✔
1287
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
670✔
1288

1289
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
670!
1290
    SStreamTriggerReaderTaskInnerOptions optionsTs = {0};
670✔
1291

1292
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&optionsTs, sStreamReaderInfo, req->tsdbMetaReq.startTime,
670!
1293
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, true));
1294
    SStorageAPI api = {0};
670✔
1295
    initStorageAPI(&api);
670✔
1296
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &optionsTs, &pTaskInner, NULL, sStreamReaderInfo->groupIdMap, &api));
670!
1297
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
670!
1298
    
1299
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&options, sStreamReaderInfo, req->tsdbMetaReq.startTime,
670!
1300
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, false));
1301
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(options.schemas, &pTaskInner->pResBlockDst));
670!
1302
  } else {
1303
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1304
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1305
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1306
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1307
  }
1308

1309
  blockDataCleanup(pTaskInner->pResBlockDst);
670✔
1310
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
670!
1311
  bool hasNext = true;
670✔
1312
  while (true) {
160✔
1313
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
830!
1314
    if (!hasNext) {
830✔
1315
      break;
670✔
1316
    }
1317
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
160✔
1318
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
160✔
1319

1320
    int32_t index = 0;
160✔
1321
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
160!
1322
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
160!
1323
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
160!
1324
    if (sStreamReaderInfo->uidList == NULL) {
160✔
1325
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
35!
1326
    }
1327
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
160!
1328

1329
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
160✔
1330
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1331
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1332
            pTaskInner->pResBlockDst->info.rows++;
160✔
1333
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
160!
1334
      break;
×
1335
    }
1336
  }
1337

1338
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
670✔
1339
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
670!
1340
  if (!hasNext) {
670!
1341
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
670!
1342
  }
1343

1344
end:
670✔
1345
  STREAM_PRINT_LOG_END_WITHID(code, lino);
670!
1346
  SRpcMsg rsp = {
670✔
1347
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1348
  tmsgSendRsp(&rsp);
670✔
1349
  taosArrayDestroy(options.schemas);
670✔
1350
  return code;
670✔
1351
}
1352

1353
static int32_t vnodeProcessStreamTsdbTsDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
36✔
1354
  int32_t                 code = 0;
36✔
1355
  int32_t                 lino = 0;
36✔
1356
  SStreamReaderTaskInner* pTaskInner = NULL;
36✔
1357
  void*                   buf = NULL;
36✔
1358
  size_t                  size = 0;
36✔
1359
  SSDataBlock*            pBlockRes = NULL;
36✔
1360

1361
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
36!
1362
  void* pTask = sStreamReaderInfo->pTask;
36✔
1363
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
36!
1364

1365
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
36✔
1366
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
1367
  reSetUid(&options, req->tsdbTsDataReq.suid, req->tsdbTsDataReq.uid);
36✔
1368
  SStorageAPI api = {0};
36✔
1369
  initStorageAPI(&api);
36✔
1370
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
36!
1371
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
36!
1372
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
36!
1373

1374
  while (1) {
36✔
1375
    bool hasNext = false;
72✔
1376
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
72!
1377
    if (!hasNext) {
72✔
1378
      break;
36✔
1379
    }
1380
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
36✔
1381

1382
    SSDataBlock* pBlock = NULL;
36✔
1383
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
36!
1384
    if (pBlock != NULL && pBlock->info.rows > 0) {
36!
1385
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &api, pBlock));
36!
1386
    }
1387
    
1388
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
36!
1389
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
36!
1390
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
36!
1391
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1392
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1393
  }
1394

1395
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
36✔
1396

1397
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
36!
1398
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
36!
1399

1400
end:
36✔
1401
  STREAM_PRINT_LOG_END_WITHID(code, lino);
36!
1402
  SRpcMsg rsp = {
36✔
1403
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1404
  tmsgSendRsp(&rsp);
36✔
1405
  blockDataDestroy(pBlockRes);
36✔
1406

1407
  releaseStreamTask(&pTaskInner);
36✔
1408
  return code;
36✔
1409
}
1410

1411
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
336✔
1412
  int32_t code = 0;
336✔
1413
  int32_t lino = 0;
336✔
1414
  void*   buf = NULL;
336✔
1415
  size_t  size = 0;
336✔
1416

1417
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
336!
1418
  SStreamReaderTaskInner* pTaskInner = NULL;
336✔
1419
  void* pTask = sStreamReaderInfo->pTask;
336✔
1420
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
336✔
1421
  
1422
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
336✔
1423

1424
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
336✔
1425
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, true, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
162✔
1426
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
1427
                 req->tsdbTriggerDataReq.gid, true, NULL);
1428
    SStorageAPI api = {0};
162✔
1429
    initStorageAPI(&api);
162✔
1430
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock,
162!
1431
                                           sStreamReaderInfo->groupIdMap, &api));
1432
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
162!
1433
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
162!
1434
  } else {
1435
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
174✔
1436
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
174!
1437
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
174✔
1438
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
174!
1439
  }
1440

1441
  blockDataCleanup(pTaskInner->pResBlockDst);
336✔
1442
  bool hasNext = true;
336✔
1443
  while (1) {
×
1444
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
336!
1445
    if (!hasNext) {
336✔
1446
      break;
162✔
1447
    }
1448
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
174✔
1449
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
174✔
1450

1451
    SSDataBlock* pBlock = NULL;
174✔
1452
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
174!
1453
    if (pBlock != NULL && pBlock->info.rows > 0) {
174!
1454
      STREAM_CHECK_RET_GOTO(
174!
1455
        processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &pTaskInner->api, pBlock));
1456
    }
1457
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
174!
1458
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
174!
1459
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
174✔
1460
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1461
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1462
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
174!
1463
      break;
174✔
1464
    }
1465
  }
1466

1467
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
336!
1468
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
336✔
1469
  if (!hasNext) {
336✔
1470
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
162!
1471
  }
1472

1473
end:
336✔
1474
  STREAM_PRINT_LOG_END_WITHID(code, lino);
336!
1475
  SRpcMsg rsp = {
336✔
1476
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1477
  tmsgSendRsp(&rsp);
336✔
1478

1479
  return code;
336✔
1480
}
1481

1482
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
28,399✔
1483
  int32_t code = 0;
28,399✔
1484
  int32_t lino = 0;
28,399✔
1485
  void*   buf = NULL;
28,399✔
1486
  size_t  size = 0;
28,399✔
1487
  SSDataBlock*            pBlockRes = NULL;
28,399✔
1488

1489
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
28,399!
1490
  void* pTask = sStreamReaderInfo->pTask;
28,399✔
1491
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, 
28,399✔
1492
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid);
1493

1494
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
28,399!
1495

1496
  SStreamReaderTaskInner* pTaskInner = NULL;
28,399✔
1497
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
28,399✔
1498

1499
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
28,399!
1500
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
28,399✔
1501
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
1502
    SStorageAPI api = {0};
28,399✔
1503
    initStorageAPI(&api);
28,399✔
1504
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
28,399✔
1505

1506
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
28,303!
1507
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
28,304!
1508
  } else {
1509
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1510
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1511
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1512
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1513
  }
1514

1515
  blockDataCleanup(pTaskInner->pResBlockDst);
28,303✔
1516
  bool hasNext = true;
28,302✔
1517
  while (1) {
2,571✔
1518
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
30,873!
1519
    if (!hasNext) {
30,870✔
1520
      break;
28,302✔
1521
    }
1522
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
2,568✔
1523

1524
    SSDataBlock* pBlock = NULL;
2,567✔
1525
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
2,567!
1526
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
2,566!
1527
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
2,566!
1528
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
2,571!
1529
      break;
×
1530
    }
1531
  }
1532

1533
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
28,302!
1534
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
28,300!
1535
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
28,301✔
1536
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
28,303!
1537
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
28,299✔
1538
  if (!hasNext) {
28,304!
1539
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
28,304!
1540
  }
1541

1542
end:
28,304✔
1543
  STREAM_PRINT_LOG_END_WITHID(code, lino);
28,399!
1544
  SRpcMsg rsp = {
28,399✔
1545
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1546
  tmsgSendRsp(&rsp);
28,399✔
1547
  blockDataDestroy(pBlockRes);
28,399✔
1548
  return code;
28,397✔
1549
}
1550

1551
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
298✔
1552
  int32_t code = 0;
298✔
1553
  int32_t lino = 0;
298✔
1554
  void*   buf = NULL;
298✔
1555
  size_t  size = 0;
298✔
1556

1557
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
298!
1558
  void* pTask = sStreamReaderInfo->pTask;
298✔
1559
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
298✔
1560

1561
  SStreamReaderTaskInner* pTaskInner = NULL;
298✔
1562
  int64_t key = req->tsdbDataReq.uid;
298✔
1563

1564
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
298!
1565
    SStreamTriggerReaderTaskInnerOptions options = {0};
298✔
1566

1567
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbData(pVnode, &options, sStreamReaderInfo, req->tsdbDataReq.uid,
298!
1568
                                                   req->tsdbDataReq.cids, req->tsdbDataReq.order, req->tsdbDataReq.skey,
1569
                                                   req->tsdbDataReq.ekey, req->tsdbDataReq.ver));
1570
    reSetUid(&options, req->tsdbDataReq.suid, req->tsdbDataReq.uid);
298✔
1571

1572
    SStorageAPI api = {0};
298✔
1573
    initStorageAPI(&api);
298✔
1574
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
298!
1575

1576
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
298✔
1577
    cleanupQueryTableDataCond(&pTaskInner->cond);
298✔
1578
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
298!
1579
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
1580
                                                        pTaskInner->options.suid, pTaskInner->options.ver));
1581
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
298!
1582
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
1583

1584
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
298!
1585
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
298!
1586
  } else {
1587
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1588
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1589
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1590
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1591
  }
1592

1593
  blockDataCleanup(pTaskInner->pResBlockDst);
298✔
1594
  bool hasNext = true;
298✔
1595
  while (1) {
298✔
1596
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
596!
1597
    if (!hasNext) {
596✔
1598
      break;
298✔
1599
    }
1600

1601
    SSDataBlock* pBlock = NULL;
298✔
1602
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
298!
1603
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
298!
1604
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
298!
1605
      break;
×
1606
    }
1607
  }
1608
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
298!
1609
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
298✔
1610
  if (!hasNext) {
298!
1611
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
298!
1612
  }
1613

1614
end:
298✔
1615
  STREAM_PRINT_LOG_END_WITHID(code, lino);
298!
1616
  SRpcMsg rsp = {
298✔
1617
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1618
  tmsgSendRsp(&rsp);
298✔
1619

1620
  return code;
298✔
1621
}
1622

1623
static int32_t vnodeProcessStreamWalMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
8,057✔
1624
  int32_t      code = 0;
8,057✔
1625
  int32_t      lino = 0;
8,057✔
1626
  void*        buf = NULL;
8,057✔
1627
  size_t       size = 0;
8,057✔
1628
  SSDataBlock* pBlock = NULL;
8,057✔
1629
  void*        pTableList = NULL;
8,057✔
1630
  SNodeList*   groupNew = NULL;
8,057✔
1631
  int64_t      lastVer = 0;
8,057✔
1632

1633
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
8,057✔
1634
  void* pTask = sStreamReaderInfo->pTask;
8,037✔
1635
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64 ",ctime:%" PRId64, TD_VID(pVnode), __func__,
8,037✔
1636
  req->walMetaReq.lastVer, req->walMetaReq.ctime);
1637

1638
  bool isVTable = sStreamReaderInfo->uidList != NULL;
8,037✔
1639
  if (sStreamReaderInfo->uidList == NULL) {
8,037✔
1640
    SStorageAPI api = {0};
7,673✔
1641
    initStorageAPI(&api);
7,673✔
1642
    STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
7,678!
1643
    STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
7,672!
1644
        pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1645
        groupNew, false, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1646
        &pTableList, sStreamReaderInfo->groupIdMap));
1647
  }
1648

1649
  STREAM_CHECK_RET_GOTO(createBlockForWalMeta(&pBlock, isVTable));
8,018!
1650
  STREAM_CHECK_RET_GOTO(scanWal(pVnode, isVTable ? sStreamReaderInfo->uidHash : pTableList, isVTable, pBlock,
8,059!
1651
                                req->walMetaReq.lastVer, sStreamReaderInfo->deleteReCalc,
1652
                                sStreamReaderInfo->deleteOutTbl, req->walMetaReq.ctime, &lastVer));
1653

1654
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
8,054✔
1655
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
8,054!
1656
  printDataBlock(pBlock, __func__, "");
8,051✔
1657

1658
end:
8,070✔
1659
  if (pBlock != NULL && pBlock->info.rows == 0) {
8,070✔
1660
    code = TSDB_CODE_STREAM_NO_DATA;
7,744✔
1661
    buf = rpcMallocCont(sizeof(int64_t));
7,744✔
1662
    *(int64_t *)buf = lastVer;
7,747✔
1663
    size = sizeof(int64_t);
7,747✔
1664
  }
1665
  SRpcMsg rsp = {
8,073✔
1666
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1667
  tmsgSendRsp(&rsp);
8,073✔
1668
  if (code == TSDB_CODE_STREAM_NO_DATA){
8,075✔
1669
    code = 0;
7,749✔
1670
  }
1671
  STREAM_PRINT_LOG_END_WITHID(code, lino);
8,075!
1672
  nodesDestroyList(groupNew);
8,075✔
1673
  blockDataDestroy(pBlock);
8,076✔
1674
  qStreamDestroyTableList(pTableList);
8,079✔
1675

1676
  return code;
8,077✔
1677
}
1678

1679
static int32_t vnodeProcessStreamWalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
5,820✔
1680
  int32_t      code = 0;
5,820✔
1681
  int32_t      lino = 0;
5,820✔
1682
  void*        buf = NULL;
5,820✔
1683
  size_t       size = 0;
5,820✔
1684
  SSDataBlock* pBlock = NULL;
5,820✔
1685

1686
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
5,820!
1687
  void* pTask = sStreamReaderInfo->pTask;
5,820✔
1688
  ST_TASK_DLOG("vgId:%d %s start, request type:%d skey:%" PRId64 ",ekey:%" PRId64 ",uid:%" PRId64 ",ver:%" PRId64, TD_VID(pVnode),
5,820✔
1689
  __func__, req->walDataReq.base.type, req->walDataReq.skey, req->walDataReq.ekey, req->walDataReq.uid, req->walDataReq.ver);
1690

1691
  STimeWindow window = {.skey = req->walDataReq.skey, .ekey = req->walDataReq.ekey};
5,821✔
1692
  if (req->walDataReq.base.type == STRIGGER_PULL_WAL_DATA){
5,821✔
1693
    STREAM_CHECK_RET_GOTO(processWalVerDataVTable(pVnode, req->walDataReq.cids, req->walDataReq.ver,
1,087!
1694
      req->walDataReq.uid, &window, &pBlock));
1695
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_CALC_DATA){
4,734✔
1696
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
2,583!
1697
      req->walDataReq.uid, &window, sStreamReaderInfo->calcResBlock, &pBlock));
1698
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TRIGGER_DATA) {
2,151✔
1699
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, true,
1,951!
1700
      req->walDataReq.uid, &window, sStreamReaderInfo->triggerResBlock, &pBlock));
1701
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TS_DATA){
200!
1702
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
200!
1703
      req->walDataReq.uid, &window, sStreamReaderInfo->tsBlock, &pBlock));
1704

1705
  }
1706
  
1707
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
5,819✔
1708
  printDataBlock(pBlock, __func__, "");
5,819✔
1709
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
5,816!
1710
end:
5,818✔
1711
  STREAM_PRINT_LOG_END_WITHID(code, lino);
5,818!
1712
  SRpcMsg rsp = {
5,818✔
1713
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1714
  tmsgSendRsp(&rsp);
5,818✔
1715

1716
  blockDataDestroy(pBlock);
5,820✔
1717
  return code;
5,823✔
1718
}
1719

1720
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
521✔
1721
  int32_t code = 0;
521✔
1722
  int32_t lino = 0;
521✔
1723
  void*   buf = NULL;
521✔
1724
  size_t  size = 0;
521✔
1725

1726
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
521!
1727
  void* pTask = sStreamReaderInfo->pTask;
521✔
1728
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
521✔
1729

1730
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
521✔
1731
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
521!
1732
  SStreamGroupInfo pGroupInfo = {0};
521✔
1733
  pGroupInfo.gInfo = *gInfo;
521✔
1734

1735
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
521✔
1736
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1737
  buf = rpcMallocCont(size);
521✔
1738
  STREAM_CHECK_NULL_GOTO(buf, terrno);
521!
1739
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
521✔
1740
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1741
end:
521✔
1742
  if (code != 0) {
521!
1743
    rpcFreeCont(buf);
×
1744
    buf = NULL;
×
1745
    size = 0;
×
1746
  }
1747
  STREAM_PRINT_LOG_END_WITHID(code, lino);
521!
1748
  SRpcMsg rsp = {
521✔
1749
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1750
  tmsgSendRsp(&rsp);
521✔
1751

1752
  return code;
521✔
1753
}
1754

1755
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
56✔
1756
  int32_t              code = 0;
56✔
1757
  int32_t              lino = 0;
56✔
1758
  void*                buf = NULL;
56✔
1759
  size_t               size = 0;
56✔
1760
  SStreamMsgVTableInfo vTableInfo = {0};
56✔
1761
  SMetaReader          metaReader = {0};
56✔
1762
  SNodeList*           groupNew = NULL;
56✔
1763
  void*                pTableList = NULL;
56✔
1764
  SStorageAPI api = {0};
56✔
1765
  initStorageAPI(&api);
56✔
1766

1767
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
56!
1768
  void* pTask = sStreamReaderInfo->pTask;
56✔
1769
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
56✔
1770

1771
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
56!
1772
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
56!
1773
      pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1774
      groupNew, true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1775
      &pTableList, sStreamReaderInfo->groupIdMap));
1776

1777
  SArray* cids = req->virTableInfoReq.cids;
56✔
1778
  STREAM_CHECK_NULL_GOTO(cids, terrno);
56!
1779

1780
  SArray* pTableListArray = qStreamGetTableArrayList(pTableList);
56✔
1781
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
56!
1782

1783
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
56✔
1784
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
56!
1785
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
56✔
1786

1787
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
180✔
1788
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
124✔
1789
    if (pKeyInfo == NULL) {
124!
1790
      continue;
×
1791
    }
1792
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
124✔
1793
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
124!
1794
    vTable->uid = pKeyInfo->uid;
124✔
1795
    vTable->gId = pKeyInfo->groupId;
124✔
1796

1797
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
124✔
1798
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
124!
1799
      vTable->cols.nCols = metaReader.me.colRef.nCols;
×
1800
      vTable->cols.version = metaReader.me.colRef.version;
×
1801
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
×
1802
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
×
1803
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
×
1804
      }
1805
    } else {
1806
      vTable->cols.nCols = taosArrayGetSize(cids);
124✔
1807
      vTable->cols.version = metaReader.me.colRef.version;
124✔
1808
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
124!
1809
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
390✔
1810
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
693✔
1811
          if (metaReader.me.colRef.pColRef[j].hasRef &&
569✔
1812
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
303✔
1813
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
142✔
1814
            break;
142✔
1815
          }
1816
        }
1817
      }
1818
    }
1819
    tDecoderClear(&metaReader.coder);
123✔
1820
  }
1821
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
56✔
1822
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
56!
1823

1824
end:
56✔
1825
  nodesDestroyList(groupNew);
56✔
1826
  qStreamDestroyTableList(pTableList);
56✔
1827
  tDestroySStreamMsgVTableInfo(&vTableInfo);
56✔
1828
  api.metaReaderFn.clearReader(&metaReader);
56✔
1829
  STREAM_PRINT_LOG_END_WITHID(code, lino);
56!
1830
  SRpcMsg rsp = {
56✔
1831
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1832
  tmsgSendRsp(&rsp);
56✔
1833
  return code;
56✔
1834
}
1835

1836
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
28✔
1837
  int32_t                   code = 0;
28✔
1838
  int32_t                   lino = 0;
28✔
1839
  void*                     buf = NULL;
28✔
1840
  size_t                    size = 0;
28✔
1841
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
28✔
1842
  SMetaReader               metaReader = {0};
28✔
1843
  int64_t streamId = req->base.streamId;
28✔
1844
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
28✔
1845

1846
  SStorageAPI api = {0};
28✔
1847
  initStorageAPI(&api);
28✔
1848

1849
  SArray* cols = req->origTableInfoReq.cols;
28✔
1850
  STREAM_CHECK_NULL_GOTO(cols, terrno);
28!
1851

1852
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
28✔
1853

1854
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
28!
1855

1856
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
28✔
1857
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
98✔
1858
    OTableInfo*    oInfo = taosArrayGet(cols, i);
70✔
1859
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
70✔
1860
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
70!
1861
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
70!
1862
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
70!
1863
    vTableInfo->uid = metaReader.me.uid;
70✔
1864
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
70✔
1865

1866
    SSchemaWrapper* sSchemaWrapper = NULL;
70✔
1867
    if (metaReader.me.type == TD_CHILD_TABLE) {
70✔
1868
      int64_t suid = metaReader.me.ctbEntry.suid;
67✔
1869
      vTableInfo->suid = suid;
67✔
1870
      tDecoderClear(&metaReader.coder);
67✔
1871
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
67!
1872
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
67✔
1873
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
3!
1874
      vTableInfo->suid = 0;
3✔
1875
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
3✔
1876
    } else {
1877
      stError("invalid table type:%d", metaReader.me.type);
×
1878
    }
1879

1880
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
151!
1881
      SSchema* s = sSchemaWrapper->pSchema + j;
151✔
1882
      if (strcmp(s->name, oInfo->refColName) == 0) {
151✔
1883
        vTableInfo->cid = s->colId;
70✔
1884
        break;
70✔
1885
      }
1886
    }
1887
    if (vTableInfo->cid == 0) {
70!
1888
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
1889
              oInfo->refTableName);
1890
    }
1891
    tDecoderClear(&metaReader.coder);
70✔
1892
  }
1893

1894
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
28!
1895

1896
end:
28✔
1897
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
28✔
1898
  api.metaReaderFn.clearReader(&metaReader);
28✔
1899
  STREAM_PRINT_LOG_END(code, lino);
28!
1900
  SRpcMsg rsp = {
28✔
1901
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1902
  tmsgSendRsp(&rsp);
28✔
1903
  return code;
28✔
1904
}
1905

1906
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
521✔
1907
  int32_t                   code = 0;
521✔
1908
  int32_t                   lino = 0;
521✔
1909
  void*                     buf = NULL;
521✔
1910
  size_t                    size = 0;
521✔
1911
  SSDataBlock* pBlock = NULL;
521✔
1912

1913
  SMetaReader               metaReader = {0};
521✔
1914
  SMetaReader               metaReaderStable = {0};
521✔
1915
  int64_t streamId = req->base.streamId;
521✔
1916
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
521✔
1917

1918
  SStorageAPI api = {0};
521✔
1919
  initStorageAPI(&api);
521✔
1920

1921
  SArray* cols = req->virTablePseudoColReq.cids;
521✔
1922
  STREAM_CHECK_NULL_GOTO(cols, terrno);
521!
1923

1924
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
521✔
1925
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
521!
1926

1927
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
521!
1928

1929
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
521!
1930
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
521!
UNCOV
1931
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 && *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
×
UNCOV
1932
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
×
UNCOV
1933
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
UNCOV
1934
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
×
UNCOV
1935
    pBlock->info.rows = 1;
×
UNCOV
1936
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
×
UNCOV
1937
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
×
UNCOV
1938
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
×
1939
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
521!
1940
    int64_t suid = metaReader.me.ctbEntry.suid;
521✔
1941
    api.metaReaderFn.readerReleaseLock(&metaReader);
521✔
1942
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
521✔
1943

1944
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
521!
1945
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
521✔
1946
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
1,563✔
1947
      col_id_t* id = taosArrayGet(cols, i);
1,042✔
1948
      STREAM_CHECK_NULL_GOTO(id, terrno);
1,042!
1949
      if (*id == -1) {
1,042✔
1950
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
521✔
1951
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
521!
1952
        continue;
521✔
1953
      }
1954
      size_t j = 0;
521✔
1955
      for (; j < sSchemaWrapper->nCols; j++) {
521!
1956
        SSchema* s = sSchemaWrapper->pSchema + j;
521✔
1957
        if (s->colId == *id) {
521!
1958
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
521✔
1959
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
521!
1960
          break;
521✔
1961
        }
1962
      }
1963
      if (j == sSchemaWrapper->nCols) {
521!
1964
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
1965
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
1966
      }
1967
    }
1968
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
521!
1969
    pBlock->info.rows = 1;
521✔
1970
    
1971
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
1,563✔
1972
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,042✔
1973
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
1,042!
1974

1975
      if (pDst->info.colId == -1) {
1,042✔
1976
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
521!
1977
        continue;
521✔
1978
      }
1979
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
521!
1980
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
1981
        continue;
×
1982
      }
1983

1984
      STagVal val = {0};
521✔
1985
      val.cid = pDst->info.colId;
521✔
1986
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
521✔
1987

1988
      char* data = NULL;
521✔
1989
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
521!
1990
        data = tTagValToData((const STagVal*)p, false);
521✔
1991
      } else {
UNCOV
1992
        data = (char*)p;
×
1993
      }
1994

1995
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
521!
1996
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
1997

1998
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
521!
1999
          (data != NULL)) {
UNCOV
2000
        taosMemoryFree(data);
×
2001
      }
2002
    }
2003
  } else {
2004
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
2005
    code = TSDB_CODE_INVALID_PARA;
×
2006
    goto end;
×
2007
  }
2008
  
2009
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
521✔
2010
  printDataBlock(pBlock, __func__, "");
521✔
2011
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
521!
2012

2013
end:
521✔
2014
  api.metaReaderFn.clearReader(&metaReaderStable);
521✔
2015
  api.metaReaderFn.clearReader(&metaReader);
521✔
2016
  STREAM_PRINT_LOG_END(code, lino);
521!
2017
  SRpcMsg rsp = {
521✔
2018
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2019
  tmsgSendRsp(&rsp);
521✔
2020
  blockDataDestroy(pBlock);
521✔
2021
  return code;
521✔
2022
}
2023

2024
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
14,535✔
2025
  int32_t            code = 0;
14,535✔
2026
  int32_t            lino = 0;
14,535✔
2027
  void*              buf = NULL;
14,535✔
2028
  size_t             size = 0;
14,535✔
2029
  void*              taskAddr = NULL;
14,535✔
2030
  SArray*            pResList = NULL;
14,535✔
2031

2032
  SResFetchReq req = {0};
14,535✔
2033
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
14,535!
2034
                              TSDB_CODE_QRY_INVALID_INPUT);
2035
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
14,535✔
2036
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
14,535!
2037

2038
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
14,535!
2039
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
14,535✔
2040
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
14,535!
2041
  void* pTask = sStreamReaderCalcInfo->pTask;
14,535✔
2042
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
14,535✔
2043
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
2044

2045
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
14,535!
2046
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
14,481✔
2047
    int64_t uid = 0;
14,481✔
2048
    if (req.dynTbname) {
14,481✔
2049
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
6✔
2050
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
6!
2051
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
6✔
2052
        if (pValue != NULL && pValue->isTbname) {
6!
2053
          uid = pValue->uid;
6✔
2054
          break;
6✔
2055
        }
2056
      }
2057
    }
2058
    
2059
    SReadHandle handle = {0};
14,481✔
2060
    handle.vnode = pVnode;
14,481✔
2061
    handle.uid = uid;
14,481✔
2062

2063
    initStorageAPI(&handle.api);
14,481✔
2064
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
14,481✔
2065
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
14,445!
2066
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
14,481✔
2067
      if (node != NULL) {
14,481✔
2068
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle));
14,461!
2069
      }
2070
    }
2071

2072
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
14,481✔
2073
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
14,481✔
2074

2075
    // if (sStreamReaderCalcInfo->pTaskInfo == NULL) {
2076
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
14,481!
2077
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
2078
                                                    req.taskId));
2079
    // } else {
2080
    // STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, handle.winRange));
2081
    // }
2082

2083
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
14,481!
2084
  }
2085

2086
  if (req.pOpParam != NULL) {
14,535!
UNCOV
2087
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
×
2088
  }
2089
  
2090
  pResList = taosArrayInit(4, POINTER_BYTES);
14,535✔
2091
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
14,535!
2092
  uint64_t ts = 0;
14,535✔
2093
  bool     hasNext = false;
14,535✔
2094
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
14,535!
2095

2096
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
23,480✔
2097
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
8,945✔
2098
    if (pBlock == NULL) continue;
8,945!
2099
    printDataBlock(pBlock, __func__, "fetch");
8,945✔
2100
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
8,945✔
2101
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo));
314!
2102
      printDataBlock(pBlock, __func__, "fetch filter");
314✔
2103
    }
2104
  }
2105

2106
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
14,535!
2107
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
14,535✔
2108

2109
end:
16✔
2110
  taosArrayDestroy(pResList);
14,535✔
2111
  streamReleaseTask(taskAddr);
14,535✔
2112

2113
  STREAM_PRINT_LOG_END(code, lino);
14,535!
2114
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
14,535✔
2115
  tmsgSendRsp(&rsp);
14,535✔
2116
  tDestroySResFetchReq(&req);
14,535✔
2117
  return code;
14,535✔
2118
}
2119

2120
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
60,065✔
2121
  int32_t                   code = 0;
60,065✔
2122
  int32_t                   lino = 0;
60,065✔
2123
  SSTriggerPullRequestUnion req = {0};
60,065✔
2124
  void*                     taskAddr = NULL;
60,065✔
2125

2126
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
60,065✔
2127
  if (!syncIsReadyForRead(pVnode->sync)) {
60,068✔
2128
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
135✔
2129
    return 0;
134✔
2130
  }
2131

2132
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
59,952✔
2133
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
14,535✔
2134
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
45,417!
2135
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
45,418✔
2136
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
45,418✔
2137
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
45,418!
2138
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
45,392✔
2139
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
2140
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
45,398✔
2141
    switch (req.base.type) {
45,398!
2142
      case STRIGGER_PULL_SET_TABLE:
28✔
2143
        code = vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo);
28✔
2144
        break;
28✔
2145
      case STRIGGER_PULL_LAST_TS:
293✔
2146
        code = vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
293✔
2147
        break;
293✔
2148
      case STRIGGER_PULL_FIRST_TS:
327✔
2149
        code = vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
327✔
2150
        break;
327✔
2151
      case STRIGGER_PULL_TSDB_META:
670✔
2152
      case STRIGGER_PULL_TSDB_META_NEXT:
2153
        code = vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
670✔
2154
        break;
670✔
2155
      case STRIGGER_PULL_TSDB_TS_DATA:
36✔
2156
        code = vnodeProcessStreamTsdbTsDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
36✔
2157
        break;
36✔
2158
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
336✔
2159
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
2160
        code = vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
336✔
2161
        break;
336✔
2162
      case STRIGGER_PULL_TSDB_CALC_DATA:
28,399✔
2163
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
2164
        code = vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
28,399✔
2165
        break;
28,397✔
2166
      case STRIGGER_PULL_TSDB_DATA:
298✔
2167
      case STRIGGER_PULL_TSDB_DATA_NEXT:
2168
        code = vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
298✔
2169
        break;
298✔
2170
      case STRIGGER_PULL_WAL_META:
8,063✔
2171
        code = vnodeProcessStreamWalMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
8,063✔
2172
        break;
8,077✔
2173
      case STRIGGER_PULL_WAL_TS_DATA:
5,822✔
2174
      case STRIGGER_PULL_WAL_TRIGGER_DATA:
2175
      case STRIGGER_PULL_WAL_CALC_DATA:
2176
      case STRIGGER_PULL_WAL_DATA:
2177
        code = vnodeProcessStreamWalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
5,822✔
2178
        break;
5,819✔
2179
      case STRIGGER_PULL_GROUP_COL_VALUE:
521✔
2180
        code = vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo);
521✔
2181
        break;
521✔
2182
      case STRIGGER_PULL_VTABLE_INFO:
56✔
2183
        code = vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo);
56✔
2184
        break;
56✔
2185
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
521✔
2186
        code = vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req);
521✔
2187
        break;
521✔
2188
      case STRIGGER_PULL_OTABLE_INFO:
28✔
2189
        code = vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req);
28✔
2190
        break;
28✔
2191
      default:
×
2192
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
2193
        code = TSDB_CODE_APP_ERROR;
×
2194
        break;
×
2195
    }
2196
  } else {
2197
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
2198
    code = TSDB_CODE_APP_ERROR;
×
2199
  }
2200
end:
45,407✔
2201

2202
  streamReleaseTask(taskAddr);
45,407✔
2203

2204
  tDestroySTriggerPullRequest(&req);
45,410✔
2205
  STREAM_PRINT_LOG_END(code, lino);
45,410!
2206
  return code;
45,401✔
2207
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc