• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4667

14 Aug 2025 01:04PM UTC coverage: 59.532% (-0.6%) from 60.112%
#4667

push

travis-ci

web-flow
fix(query): fix order by column check of union operator (#32524)

136437 of 292055 branches covered (46.72%)

Branch coverage included in aggregate %.

1 of 13 new or added lines in 1 file covered. (7.69%)

2683 existing lines in 164 files now uncovered.

206730 of 284385 relevant lines covered (72.69%)

4603587.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.16
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "scalar.h"
17
#include "streamReader.h"
18
#include "tdatablock.h"
19
#include "tdb.h"
20
#include "tencode.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23
#include "vnd.h"
24
#include "vnode.h"
25
#include "vnodeInt.h"
26

27
#define BUILD_OPTION(options, sStreamInfo, _ver, _groupSort, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
28
                     _gid, _initReader, _uidList)                                                                        \
29
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_uidList == NULL ? sStreamInfo->suid : 0),              \
30
                                                  .uid = sStreamInfo->uid,                                         \
31
                                                  .ver = _ver,                                                     \
32
                                                  .tableType = sStreamInfo->tableType,                             \
33
                                                  .groupSort = _groupSort,                                         \
34
                                                  .order = _order,                                                 \
35
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
36
                                                  .pTagCond = sStreamInfo->pTagCond,                               \
37
                                                  .pTagIndexCond = sStreamInfo->pTagIndexCond,                     \
38
                                                  .pConditions = sStreamInfo->pConditions,                         \
39
                                                  .partitionCols = sStreamInfo->partitionCols,                     \
40
                                                  .schemas = _schemas,                                             \
41
                                                  .isSchema = _isSchema,                                           \
42
                                                  .scanMode = _scanMode,                                           \
43
                                                  .gid = _gid,                                                     \
44
                                                  .initReader = _initReader,                                       \
45
                                                  .uidList = _uidList};
46

47
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
29,020✔
48

49
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
50

51
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
37,431✔
52
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
37,431✔
53
  if (pSrc == NULL) {
37,422✔
54
    return terrno;
14✔
55
  }
56

57
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
37,408✔
58
  return 0;
37,408✔
59
}
60

61
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
33,524✔
62
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
33,524✔
63
  if (code != TSDB_CODE_SUCCESS) {
33,528!
UNCOV
64
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
65
  }
66

67
  return code;
33,528✔
68
}
69

70
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
3,002✔
71
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
3,002✔
72
}
73

74
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
66✔
75
  int32_t code = 0;
66✔
76
  int32_t lino = 0;
66✔
77
  void*   buf = NULL;
66✔
78
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
66✔
79
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
66!
80
  buf = rpcMallocCont(len);
66✔
81
  STREAM_CHECK_NULL_GOTO(buf, terrno);
66!
82
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
66✔
83
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
66!
84
  *data = buf;
66✔
85
  *size = len;
66✔
86
  buf = NULL;
66✔
87
end:
66✔
88
  rpcFreeCont(buf);
66✔
89
  return code;
66✔
90
}
91

92
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
151✔
93
  int32_t code = 0;
151✔
94
  int32_t lino = 0;
151✔
95
  void*   buf = NULL;
151✔
96
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
151✔
97
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
151!
98
  buf = rpcMallocCont(len);
151✔
99
  STREAM_CHECK_NULL_GOTO(buf, terrno);
151!
100
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
151✔
101
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
151!
102
  *data = buf;
151✔
103
  *size = len;
151✔
104
  buf = NULL;
151✔
105
end:
151✔
106
  rpcFreeCont(buf);
151✔
107
  return code;
151✔
108
}
109

110
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
671✔
111
  int32_t code = 0;
671✔
112
  int32_t lino = 0;
671✔
113
  void*   buf = NULL;
671✔
114
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
671✔
115
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
671!
116
  buf = rpcMallocCont(len);
671✔
117
  STREAM_CHECK_NULL_GOTO(buf, terrno);
671!
118
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
671✔
119
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
671!
120
  *data = buf;
671✔
121
  *size = len;
671✔
122
  buf = NULL;
671✔
123
end:
671✔
124
  rpcFreeCont(buf);
671✔
125
  return code;
671✔
126
}
127

128

129
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
45,120✔
130
  int32_t code = 0;
45,120✔
131
  int32_t lino = 0;
45,120✔
132
  void*   buf = NULL;
45,120✔
133
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
45,120!
134
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
10,583✔
135
  buf = rpcMallocCont(dataEncodeSize);
10,581✔
136
  STREAM_CHECK_NULL_GOTO(buf, terrno);
10,587!
137
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
10,587✔
138
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
10,582!
139
  *data = buf;
10,582✔
140
  *size = dataEncodeSize;
10,582✔
141
  buf = NULL;
10,582✔
142
end:
45,119✔
143
  rpcFreeCont(buf);
45,119✔
144
  return code;
45,125✔
145
}
146

147
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
575✔
148
  int32_t        pNum = 1;
575✔
149
  STableKeyInfo* pList = NULL;
575✔
150
  int32_t        code = 0;
575✔
151
  int32_t        lino = 0;
575✔
152
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
575!
153
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
576!
154

155
  cleanupQueryTableDataCond(&pTask->cond);
577✔
156
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
576!
157
                                                      pTask->options.twindows, pTask->options.suid, pTask->options.ver));
158
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
577!
159

160
end:
577✔
161
  STREAM_PRINT_LOG_END(code, lino);
577!
162
  return code;
577✔
163
}
164

165
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t gid, bool isVTable, int64_t uid,
5,813✔
166
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
167
  int32_t code = 0;
5,813✔
168
  int32_t lino = 0;
5,813✔
169
  int32_t index = 0;
5,813✔
170
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
5,813!
171
  if (!isVTable) {
5,807✔
172
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &gid));
2,819!
173
  }
174
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
5,785!
175
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
5,782!
176
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
5,779!
177
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
5,776!
178
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
5,783!
179

180
end:
5,790✔
181
  STREAM_PRINT_LOG_END(code, lino)
5,790!
182
  return code;
5,792✔
183
}
184

185
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
5,753✔
186
  pTSchema->numOfCols = 1;
5,753✔
187
  pTSchema->version = ver;
5,753✔
188
  pTSchema->columns[0].colId = colId;
5,753✔
189
  pTSchema->columns[0].type = type;
5,753✔
190
  pTSchema->columns[0].bytes = bytes;
5,753✔
191
}
5,753✔
192

193
int32_t retrieveWalMetaData(SSubmitTbData* pSubmitTbData, void* pTableList, bool isVTable, SSDataBlock* pBlock,
43,277✔
194
                            int64_t ver) {
195
  int32_t code = 0;
43,277✔
196
  int32_t lino = 0;
43,277✔
197

198
  int64_t   uid = pSubmitTbData->uid;
43,277✔
199
  int32_t   numOfRows = 0;
43,277✔
200
  int64_t   skey = 0;
43,277✔
201
  int64_t   ekey = 0;
43,277✔
202
  STSchema* pTSchema = NULL;
43,277✔
203
  uint64_t  gid = 0;
43,277✔
204
  if (isVTable) {
43,277✔
205
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS);
9,735✔
206
  } else {
207
    STREAM_CHECK_CONDITION_GOTO(!qStreamUidInTableList(pTableList, uid), TDB_CODE_SUCCESS);
33,542✔
208
    gid = qStreamGetGroupId(pTableList, uid);
2,740✔
209
  }
210

211
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
5,764✔
212
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
10✔
213
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
10!
214
    numOfRows = pCol->nVal;
10✔
215

216
    SColVal colVal = {0};
10✔
217
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, 0, &colVal));
10!
218
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
10✔
219

220
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, numOfRows - 1, &colVal));
10!
221
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
10✔
222
  } else {
223
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
5,754✔
224
    SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, 0);
5,749✔
225
    STREAM_CHECK_NULL_GOTO(pRow, terrno);
5,741!
226
    SColVal colVal = {0};
5,741✔
227
    pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn));
5,741!
228
    STREAM_CHECK_NULL_GOTO(pTSchema, terrno);
5,781!
229
    buildTSchema(pTSchema, pSubmitTbData->sver, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES);
5,781✔
230
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
5,750!
231
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
5,751✔
232
    pRow = taosArrayGetP(pSubmitTbData->aRowP, numOfRows - 1);
5,751✔
233
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
5,751!
234
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
5,755✔
235
  }
236

237
  STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_SUBMIT_DATA, gid, isVTable, uid, skey, ekey, ver, numOfRows));
5,765!
238
  pBlock->info.rows++;
5,733✔
239
  stDebug("stream reader scan submit data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64 ", gid %" PRIu64
5,733✔
240
          ", rows:%d",
241
          uid, skey, ekey, gid, numOfRows);
242

243
end:
2,512✔
244
  taosMemoryFree(pTSchema);
43,415!
245
  STREAM_PRINT_LOG_END(code, lino)
43,422!
246
  return code;
43,429✔
247
}
248

249
int32_t retrieveWalData(SVnode* pVnode, SSubmitTbData* pSubmitTbData, SSDataBlock* pBlock, STimeWindow* window) {
7,626✔
250
  stDebug("stream reader retrieve data block %p", pSubmitTbData);
7,626✔
251
  int32_t code = 0;
7,626✔
252
  int32_t lino = 0;
7,626✔
253

254
  int32_t ver = pSubmitTbData->sver;
7,626✔
255
  int64_t uid = pSubmitTbData->uid;
7,626✔
256
  int32_t numOfRows = 0;
7,626✔
257

258
  STSchema* schemas = metaGetTbTSchema(pVnode->pMeta, uid, ver, 1);
7,626✔
259
  STREAM_CHECK_NULL_GOTO(schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
7,633!
260
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
7,633✔
261
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
179✔
262
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
179!
263
    int32_t rowStart = 0;
179✔
264
    int32_t rowEnd = pCol->nVal;
179✔
265
    if (window != NULL) {
179!
266
      SColVal colVal = {0};
179✔
267
      rowStart = -1;
179✔
268
      rowEnd = -1;
179✔
269
      for (int32_t k = 0; k < pCol->nVal; k++) {
3,222✔
270
        STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
3,043!
271
        int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
3,043✔
272
        if (ts >= window->skey && rowStart == -1) {
3,043!
273
          rowStart = k;
179✔
274
        }
275
        if (ts > window->ekey && rowEnd == -1) {
3,043!
UNCOV
276
          rowEnd = k;
×
277
        }
278
      }
279
      STREAM_CHECK_CONDITION_GOTO(rowStart == -1 || rowStart == rowEnd, TDB_CODE_SUCCESS);
179!
280

281
      if (rowStart != -1 && rowEnd == -1) {
179!
282
        rowEnd = pCol->nVal;
179✔
283
      }
284
    }
285
    numOfRows = rowEnd - rowStart;
179✔
286
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
179!
287
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
766✔
288
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
587✔
289
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
587!
290
      if (i >= taosArrayGetSize(pSubmitTbData->aCol)) {
587!
UNCOV
291
        break;
×
292
      }
293
      for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aCol); j++) {
5,283✔
294
        SColData* pCol = taosArrayGet(pSubmitTbData->aCol, j);
4,696✔
295
        STREAM_CHECK_NULL_GOTO(pCol, terrno);
4,696!
296
        if (pCol->cid == pColData->info.colId) {
4,696✔
297
          for (int32_t k = rowStart; k < rowEnd; k++) {
10,566✔
298
            SColVal colVal = {0};
9,979✔
299
            STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
9,979!
300
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
9,979!
301
                                                !COL_VAL_IS_VALUE(&colVal)));
302
          }
303
        }
304
      }
305
    }
306
  } else {
307
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, taosArrayGetSize(pSubmitTbData->aRowP)));
7,454!
308
    for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aRowP); j++) {
523,441✔
309
      SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, j);
515,993✔
310
      STREAM_CHECK_NULL_GOTO(pRow, terrno);
516,000!
311
      SColVal colVal = {0};
516,000✔
312
      STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, 0, &colVal));
516,000!
313
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
516,002✔
314
      if (window != NULL && (ts < window->skey || ts > window->ekey)) {
516,002✔
315
        continue;
45✔
316
      }
317
      for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
1,554,738✔
318
        if (i >= schemas->numOfCols) {
1,552,077✔
319
          break;
513,270✔
320
        }
321
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
1,038,807✔
322
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
1,038,805✔
323
        SColVal colVal = {0};
1,038,803✔
324
        int32_t sourceIdx = 0;
1,038,803✔
325
        while (1) {
326
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, sourceIdx, &colVal));
1,582,506!
327
          if (colVal.cid < pColData->info.colId) {
1,582,481✔
328
            sourceIdx++;
543,703✔
329
            continue;
543,703✔
330
          } else {
331
            break;
1,038,778✔
332
          }
333
        }
334
        if (colVal.cid == pColData->info.colId && COL_VAL_IS_VALUE(&colVal)) {
1,038,778!
335
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
1,033,993!
336
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, numOfRows, colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
24!
337
          } else {
338
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
1,033,969!
339
          }
340
        } else {
341
          colDataSetNULL(pColData, numOfRows);
4,785!
342
        }
343
      }
344
      numOfRows++;
515,946✔
345
    }
346
  }
347

348
  pBlock->info.rows = numOfRows;
7,630✔
349

350
end:
7,630✔
351
  taosMemoryFree(schemas);
7,630!
352
  STREAM_PRINT_LOG_END(code, lino)
7,631!
353
  return code;
7,630✔
354
}
355

356
static int32_t buildDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, SDeleteRes* req, int64_t uid, int64_t ver){
124✔
357
  int32_t    code = 0;
124✔
358
  int32_t    lino = 0;
124✔
359
  uint64_t   gid = 0;
124✔
360
  stDebug("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
124✔
361
  if (isVTable) {
124✔
362
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS)
25✔
363
  } else {
364
    gid = qStreamGetGroupId(pTableList, uid);
99✔
365
    STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
99✔
366
  }
367
  STREAM_CHECK_RET_GOTO(
62!
368
      buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, uid, req->skey, req->ekey, ver, 1));
369
  pBlock->info.rows++;
62✔
370

371
  stDebug("stream reader scan delete end data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
62✔
372
end:
48✔
373
  return code;
124✔
374
}
375

376
static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
124✔
377
                              int64_t ver) {
378
  int32_t    code = 0;
124✔
379
  int32_t    lino = 0;
124✔
380
  SDecoder   decoder = {0};
124✔
381
  SDeleteRes req = {0};
124✔
382
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
124✔
383
  tDecoderInit(&decoder, data, len);
124✔
384
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
124!
385
  
386
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
248✔
387
    uint64_t* uid = taosArrayGet(req.uidList, i);
124✔
388
    STREAM_CHECK_NULL_GOTO(uid, terrno);
124!
389
    STREAM_CHECK_RET_GOTO(buildDeleteData(pTableList, isVTable, pBlock, &req, *uid, ver));
124!
390
  }
391

392
end:
124✔
393
  taosArrayDestroy(req.uidList);
124✔
394
  tDecoderClear(&decoder);
124✔
395
  return code;
124✔
396
}
397

UNCOV
398
static int32_t scanDropTable(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
×
399
                             int64_t ver) {
400
  int32_t  code = 0;
×
401
  int32_t  lino = 0;
×
UNCOV
402
  SDecoder decoder = {0};
×
403

404
  SVDropTbBatchReq req = {0};
×
405
  tDecoderInit(&decoder, data, len);
×
406
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
×
407
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
408
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
409
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
×
410
    uint64_t gid = 0;
×
411
    if (isVTable) {
×
412
      if (taosHashGet(pTableList, &pDropTbReq->uid, sizeof(pDropTbReq->uid)) == NULL) {
×
UNCOV
413
        continue;
×
414
      }
415
    } else {
416
      gid = qStreamGetGroupId(pTableList, pDropTbReq->uid);
×
UNCOV
417
      if (gid == -1) continue;
×
418
    }
419

420
    STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_DELETE_TABLE, gid, isVTable, pDropTbReq->uid, 0, 0, ver, 1));
×
421
    pBlock->info.rows++;
×
UNCOV
422
    stDebug("stream reader scan drop :uid %" PRIu64 ", gid %" PRIu64, pDropTbReq->uid, gid);
×
423
  }
424

425
end:
×
426
  tDecoderClear(&decoder);
×
UNCOV
427
  return code;
×
428
}
429

430
static int32_t scanSubmitData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
43,368✔
431
                              int64_t ver) {
432
  int32_t  code = 0;
43,368✔
433
  int32_t  lino = 0;
43,368✔
434
  SDecoder decoder = {0};
43,368✔
435

436
  SSubmitReq2 submit = {0};
43,368✔
437
  tDecoderInit(&decoder, data, len);
43,368✔
438
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
43,285!
439

440
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
43,329✔
441

442
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfBlocks));
43,308!
443

444
  int32_t nextBlk = -1;
43,301✔
445
  while (++nextBlk < numOfBlocks) {
86,716✔
446
    stDebug("stream reader scan submit, next data block %d/%d", nextBlk, numOfBlocks);
43,298✔
447
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
43,298✔
448
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
43,286!
449
    STREAM_CHECK_RET_GOTO(retrieveWalMetaData(pSubmitTbData, pTableList, isVTable, pBlock, ver));
43,286!
450
  }
451
end:
43,418✔
452
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
43,418✔
453
  tDecoderClear(&decoder);
43,423✔
454
  return code;
43,455✔
455
}
456

457
static int32_t scanWal(SVnode* pVnode, void* pTableList, bool isVTable, SSDataBlock* pBlock, int64_t lastVer,
7,364✔
458
                       int8_t deleteData, int8_t deleteTb, int64_t ctime, int64_t* retVer) {
459
  int32_t code = 0;
7,364✔
460
  int32_t lino = 0;
7,364✔
461

462
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
7,364✔
463
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
7,356!
464
  *retVer = walGetAppliedVer(pWalReader->pWal);
7,356✔
465
  STREAM_CHECK_CONDITION_GOTO(walReaderSeekVer(pWalReader, lastVer + 1) != 0, TSDB_CODE_SUCCESS);
7,364✔
466

467
  while (1) {
44,034✔
468
    *retVer = walGetAppliedVer(pWalReader->pWal);
45,443✔
469
    STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader, true) < 0, TSDB_CODE_SUCCESS);
45,425✔
470

471
    SWalCont* wCont = &pWalReader->pHead->head;
44,014✔
472
    if (wCont->ingestTs / 1000 > ctime) break;
44,014!
473
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
44,014✔
474
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
44,014✔
475
    int64_t ver = wCont->version;
44,014✔
476

477
    stDebug("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
44,014✔
478
      TD_VID(pVnode), ver, wCont->msgType, deleteData, deleteTb);
479
    if (wCont->msgType == TDMT_VND_DELETE && deleteData != 0) {
44,015✔
480
      STREAM_CHECK_RET_GOTO(scanDeleteData(pTableList, isVTable, pBlock, data, len, ver));
124!
481
    } else if (wCont->msgType == TDMT_VND_DROP_TABLE && deleteTb != 0) {
43,891!
UNCOV
482
      STREAM_CHECK_RET_GOTO(scanDropTable(pTableList, isVTable, pBlock, data, len, ver));
×
483
    } else if (wCont->msgType == TDMT_VND_SUBMIT) {
43,891✔
484
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
43,388✔
485
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
43,388✔
486
      STREAM_CHECK_RET_GOTO(scanSubmitData(pTableList, isVTable, pBlock, data, len, ver));
43,388!
487
    }
488

489
    if (pBlock->info.rows >= STREAM_RETURN_ROWS_NUM) {
44,034!
UNCOV
490
      break;
×
491
    }
492
  }
493

494
end:
7,359✔
495
  walCloseReader(pWalReader);
7,359✔
496
  STREAM_PRINT_LOG_END(code, lino);
7,365!
497
  return code;
7,368✔
498
}
499

500
int32_t scanWalOneVer(SVnode* pVnode, SSDataBlock* pBlock, SSDataBlock* pBlockRet,
7,633✔
501
                      int64_t ver, int64_t uid, STimeWindow* window) {
502
  int32_t     code = 0;
7,633✔
503
  int32_t     lino = 0;
7,633✔
504
  SSubmitReq2 submit = {0};
7,633✔
505
  SDecoder    decoder = {0};
7,633✔
506

507
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
7,633✔
508
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
7,632!
509

510
  STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, ver));
7,632!
511
  STREAM_CHECK_CONDITION_GOTO(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT, TSDB_CODE_STREAM_WAL_VER_NOT_DATA);
7,627!
512
  STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
7,627!
513
  void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
7,633✔
514
  int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
7,633✔
515

516
  int32_t nextBlk = -1;
7,633✔
517
  tDecoderInit(&decoder, pBody, bodyLen);
7,633✔
518
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
7,631!
519

520
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
7,630✔
521
  while (++nextBlk < numOfBlocks) {
15,247✔
522
    stDebug("stream reader next data block %d/%d", nextBlk, numOfBlocks);
7,626✔
523
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
7,626✔
524
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
7,626!
525
    if (pSubmitTbData->uid != uid) {
7,626!
526
      stDebug("stream reader skip data block uid:%" PRId64, pSubmitTbData->uid);
×
UNCOV
527
      continue;
×
528
    }
529
    STREAM_CHECK_RET_GOTO(retrieveWalData(pVnode, pSubmitTbData, pBlock, window));
7,626!
530
    printDataBlock(pBlock, __func__, "");
7,630✔
531

532
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRet, pBlock));
7,629!
533
    blockDataCleanup(pBlock);
7,630✔
534
  }
535

536
end:
7,621✔
537
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
7,621✔
538
  walCloseReader(pWalReader);
7,632✔
539
  tDecoderClear(&decoder);
7,635✔
540
  STREAM_PRINT_LOG_END(code, lino);
7,635!
541
  return code;
7,635✔
542
}
543

544
static int32_t processTag(SVnode* pVnode, SExprInfo* pExpr, int32_t numOfExpr, SStorageAPI* api, SSDataBlock* pBlock) {
4,973✔
545
  int32_t     code = 0;
4,973✔
546
  int32_t     lino = 0;
4,973✔
547
  SMetaReader mr = {0};
4,973✔
548

549
  if (numOfExpr == 0) {
4,973!
UNCOV
550
    return TSDB_CODE_SUCCESS;
×
551
  }
552
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
4,973✔
553
  code = api->metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
4,976✔
554
  if (code != TSDB_CODE_SUCCESS) {
4,969!
UNCOV
555
    stError("failed to get table meta, uid:%" PRId64 ", code:%s", pBlock->info.id.uid, tstrerror(code));
×
556
  }
557
  api->metaReaderFn.readerReleaseLock(&mr);
4,969✔
558
  for (int32_t j = 0; j < numOfExpr; ++j) {
21,463✔
559
    const SExprInfo* pExpr1 = &pExpr[j];
16,494✔
560
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
16,494✔
561

562
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
16,494✔
563
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
16,488✔
564
    if (mr.me.name == NULL) {
16,487!
565
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
UNCOV
566
      continue;
×
567
    }
568
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
16,487✔
569

570
    int32_t functionId = pExpr1->pExpr->_function.functionId;
16,487✔
571

572
    // this is to handle the tbname
573
    if (fmIsScanPseudoColumnFunc(functionId)) {
16,487✔
574
      int32_t fType = pExpr1->pExpr->_function.functionType;
4,974✔
575
      if (fType == FUNCTION_TYPE_TBNAME) {
4,974!
576
        STREAM_CHECK_RET_GOTO(setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name));
4,975!
577
        pColInfoData->info.colId = -1;
4,966✔
578
      }
579
    } else {  // these are tags
580
      STagVal tagVal = {0};
11,518✔
581
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
11,518✔
582
      const char* p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
11,518✔
583

584
      char* data = NULL;
11,518✔
585
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
11,518!
586
        data = tTagValToData((const STagVal*)p, false);
11,519✔
587
      } else {
UNCOV
588
        data = (char*)p;
×
589
      }
590

591
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
11,518!
592
      if (isNullVal) {
11,518!
UNCOV
593
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
594
      } else {
595
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
11,518✔
596
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
11,517!
597
          taosMemoryFree(data);
7,153!
598
        }
599
        STREAM_CHECK_RET_GOTO(code);
11,520!
600
      }
601
    }
602
  }
603

604
end:
4,969✔
605
  api->metaReaderFn.clearReader(&mr);
4,969✔
606

607
  STREAM_PRINT_LOG_END(code, lino);
4,970!
608
  return code;
4,970✔
609
}
610

611
static int32_t processWalVerData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamInfo, int64_t ver, bool isTrigger,
4,768✔
612
                                 int64_t uid, STimeWindow* window, SSDataBlock* pSrcBlock, SSDataBlock** pBlock) {
613
  int32_t      code = 0;
4,768✔
614
  int32_t      lino = 0;
4,768✔
615
  SFilterInfo* pFilterInfo = NULL;
4,768✔
616
  SSDataBlock* pBlock1 = NULL;
4,768✔
617
  SSDataBlock* pBlock2 = NULL;
4,768✔
618

619
  SExprInfo*   pExpr = sStreamInfo->pExprInfo;
4,768✔
620
  int32_t      numOfExpr = sStreamInfo->numOfExpr;
4,768✔
621

622
  STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamInfo->pConditions, &pFilterInfo, 0, NULL));
4,768!
623

624
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock1));
4,764!
625
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock2));
4,769!
626
  if (!isTrigger) STREAM_CHECK_RET_GOTO(createOneDataBlock(pSrcBlock, false, pBlock));
4,769!
627

628
  pBlock2->info.id.uid = uid;
4,769✔
629
  pBlock1->info.id.uid = uid;
4,769✔
630

631
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
4,769!
632

633
  if (pBlock2->info.rows > 0) {
4,770!
634
    SStorageAPI  api = {0};
4,770✔
635
    initStorageAPI(&api);
4,770✔
636
    STREAM_CHECK_RET_GOTO(processTag(pVnode, pExpr, numOfExpr, &api, pBlock2));
4,765!
637
  }
638
  STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock2, pFilterInfo));
4,761!
639
  if (!isTrigger) {
4,768✔
640
    blockDataTransform(*pBlock, pBlock2);
2,806✔
641
  } else {
642
    *pBlock = pBlock2;
1,962✔
643
    pBlock2 = NULL;  
1,962✔
644
  }
645

646
  printDataBlock(*pBlock, __func__, "processWalVerData2");
4,768✔
647

648
end:
4,769✔
649
  STREAM_PRINT_LOG_END(code, lino);
4,769!
650
  filterFreeInfo(pFilterInfo);
4,769✔
651
  blockDataDestroy(pBlock1);
4,767✔
652
  blockDataDestroy(pBlock2);
4,765✔
653
  return code;
4,765✔
654
}
655

656
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
3,089✔
657
  int32_t code = 0;
3,089✔
658
  int32_t lino = 0;
3,089✔
659
  SMetaReader metaReader = {0};
3,089✔
660
  SStorageAPI api = {0};
3,089✔
661
  initStorageAPI(&api);
3,089✔
662
  *schemas = taosArrayInit(8, sizeof(SSchema));
3,089✔
663
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
3,089!
664
  
665
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
3,089✔
666
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
3,089!
667

668
  SSchemaWrapper* sSchemaWrapper = NULL;
3,088✔
669
  if (metaReader.me.type == TD_CHILD_TABLE) {
3,088!
670
    int64_t suid = metaReader.me.ctbEntry.suid;
3,088✔
671
    tDecoderClear(&metaReader.coder);
3,088✔
672
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
3,089!
673
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
3,087✔
UNCOV
674
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
×
UNCOV
675
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
×
676
  } else {
UNCOV
677
    qError("invalid table type:%d", metaReader.me.type);
×
678
  }
679

680
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
19,421✔
681
    SSchema* s = sSchemaWrapper->pSchema + j;
16,334✔
682
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
32,668!
683
  }
684

685
end:
3,087✔
686
  api.metaReaderFn.clearReader(&metaReader);
3,087✔
687
  STREAM_PRINT_LOG_END(code, lino);
3,089!
688
  if (code != 0)  taosArrayDestroy(*schemas);
3,089!
689
  return code;
3,089✔
690
}
691

692
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
3,089✔
693
  int32_t code = 0;
3,089✔
694
  int32_t lino = 0;
3,089✔
695
  for (size_t i = 0; i < taosArrayGetSize(schemas); i++) {
19,423✔
696
    SSchema* s = taosArrayGet(schemas, i);
16,333✔
697
    STREAM_CHECK_NULL_GOTO(s, terrno);
16,336✔
698

699
    size_t j = 0;
16,335✔
700
    for (; j < taosArrayGetSize(cols); j++) {
53,503✔
701
      col_id_t* id = taosArrayGet(cols, j);
49,103✔
702
      STREAM_CHECK_NULL_GOTO(id, terrno);
49,103!
703
      if (*id == s->colId) {
49,105✔
704
        break;
11,937✔
705
      }
706
    }
707
    if (j == taosArrayGetSize(cols)) {
16,329✔
708
      // not found, remove it
709
      taosArrayRemove(schemas, i);
4,397✔
710
      i--;
4,397✔
711
    }
712
  }
713

714
end:
3,088✔
715
  return code;
3,088✔
716
}
717

718
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
2,865✔
719
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
720
  int32_t      code = 0;
2,865✔
721
  int32_t      lino = 0;
2,865✔
722
  SArray*      schemas = NULL;
2,865✔
723

724
  SSDataBlock* pBlock1 = NULL;
2,865✔
725
  SSDataBlock* pBlock2 = NULL;
2,865✔
726

727
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
2,865!
728
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
2,865!
729
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock1));
2,864!
730
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
2,864!
731

732
  pBlock2->info.id.uid = uid;
2,865✔
733
  pBlock1->info.id.uid = uid;
2,865✔
734

735
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
2,865!
736
  printDataBlock(pBlock2, __func__, "");
2,865✔
737

738
  *pBlock = pBlock2;
2,865✔
739
  pBlock2 = NULL;
2,865✔
740

741
end:
2,865✔
742
  STREAM_PRINT_LOG_END(code, lino);
2,865!
743
  blockDataDestroy(pBlock1);
2,865✔
744
  blockDataDestroy(pBlock2);
2,864✔
745
  taosArrayDestroy(schemas);
2,864✔
746
  return code;
2,864✔
747
}
748

749
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
233,826✔
750
                                    STargetNode* pTargetNodeTs) {
751
  int32_t code = 0;
233,826✔
752
  int32_t lino = 0;
233,826✔
753

754
  SColumnNode*         pCol = NULL;
233,826✔
755
  SColumnNode*         pCol1 = NULL;
233,826✔
756
  SValueNode*          pVal = NULL;
233,826✔
757
  SValueNode*          pVal1 = NULL;
233,826✔
758
  SOperatorNode*       op = NULL;
233,826✔
759
  SOperatorNode*       op1 = NULL;
233,826✔
760
  SLogicConditionNode* cond = NULL;
233,826✔
761

762
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
233,826!
763
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
233,826✔
764
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
233,826✔
765
  pCol->node.resType.bytes = LONG_BYTES;
233,826✔
766
  pCol->slotId = pTargetNodeTs->slotId;
233,826✔
767
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
233,826✔
768

769
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
233,826!
770

771
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
233,826!
772
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
233,826✔
773
  pVal->node.resType.bytes = LONG_BYTES;
233,826✔
774
  pVal->datum.i = start;
233,826✔
775
  pVal->typeData = start;
233,826✔
776

777
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
233,826!
778
  pVal1->datum.i = end;
233,826✔
779
  pVal1->typeData = end;
233,826✔
780

781
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
233,826!
782
  op->opType = OP_TYPE_GREATER_EQUAL;
233,826✔
783
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
233,826✔
784
  op->node.resType.bytes = CHAR_BYTES;
233,826✔
785
  op->pLeft = (SNode*)pCol;
233,826✔
786
  op->pRight = (SNode*)pVal;
233,826✔
787
  pCol = NULL;
233,826✔
788
  pVal = NULL;
233,826✔
789

790
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
233,826!
791
  op1->opType = OP_TYPE_LOWER_EQUAL;
233,826✔
792
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
233,826✔
793
  op1->node.resType.bytes = CHAR_BYTES;
233,826✔
794
  op1->pLeft = (SNode*)pCol1;
233,826✔
795
  op1->pRight = (SNode*)pVal1;
233,826✔
796
  pCol1 = NULL;
233,826✔
797
  pVal1 = NULL;
233,826✔
798

799
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
233,826!
800
  cond->condType = LOGIC_COND_TYPE_AND;
233,826✔
801
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
233,826✔
802
  cond->node.resType.bytes = CHAR_BYTES;
233,826✔
803
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
233,826!
804
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
233,826!
805
  op = NULL;
233,826✔
806
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
233,826!
807
  op1 = NULL;
233,826✔
808

809
  *pCond = cond;
233,826✔
810

811
end:
233,826✔
812
  if (code != 0) {
233,826!
813
    nodesDestroyNode((SNode*)pCol);
×
814
    nodesDestroyNode((SNode*)pCol1);
×
815
    nodesDestroyNode((SNode*)pVal);
×
816
    nodesDestroyNode((SNode*)pVal1);
×
817
    nodesDestroyNode((SNode*)op);
×
818
    nodesDestroyNode((SNode*)op1);
×
UNCOV
819
    nodesDestroyNode((SNode*)cond);
×
820
  }
821
  STREAM_PRINT_LOG_END(code, lino);
233,826!
822

823
  return code;
233,826✔
824
}
825

826
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
186✔
827
  int32_t              code = 0;
186✔
828
  int32_t              lino = 0;
186✔
829
  SLogicConditionNode* pAndCondition = NULL;
186✔
830
  SLogicConditionNode* cond = NULL;
186✔
831

832
  if (pTargetNodeTs == NULL) {
186!
833
    vError("stream reader %s no ts column", __func__);
×
UNCOV
834
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
×
835
  }
836
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
186!
837
  cond->condType = LOGIC_COND_TYPE_OR;
186✔
838
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
186✔
839
  cond->node.resType.bytes = CHAR_BYTES;
186✔
840
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
186!
841

842
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
234,012✔
843
    data->curIdx = i;
233,826✔
844

845
    SReadHandle handle = {0};
233,826✔
846
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
233,826✔
847
    if (!handle.winRangeValid) {
233,826!
UNCOV
848
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
×
849
              handle.winRange.ekey);
UNCOV
850
      continue;
×
851
    }
852
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
233,826!
853
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
233,826✔
854
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
233,826!
855
    pAndCondition = NULL;
233,826✔
856
  }
857

858
  *pCond = cond;
186✔
859

860
end:
186✔
861
  if (code != 0) {
186!
862
    nodesDestroyNode((SNode*)pAndCondition);
×
UNCOV
863
    nodesDestroyNode((SNode*)cond);
×
864
  }
865
  STREAM_PRINT_LOG_END(code, lino);
186!
866

867
  return code;
186✔
868
}
869

870
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
15,659✔
871
                                    STimeRangeNode* node, SReadHandle* handle) {
872
  int32_t code = 0;
15,659✔
873
  int32_t lino = 0;
15,659✔
874
  SArray* funcVals = NULL;
15,659✔
875
  if (req->pStRtFuncInfo->withExternalWindow) {
15,659✔
876
    nodesDestroyNode(sStreamReaderCalcInfo->tsConditions);
186✔
877
    filterFreeInfo(sStreamReaderCalcInfo->pFilterInfo);
186✔
878
    sStreamReaderCalcInfo->pFilterInfo = NULL;
186✔
879

880
    STREAM_CHECK_RET_GOTO(createExternalConditions(req->pStRtFuncInfo,
186!
881
                                                   (SLogicConditionNode**)&sStreamReaderCalcInfo->tsConditions,
882
                                                   sStreamReaderCalcInfo->pTargetNodeTs, node));
883

884
    STREAM_CHECK_RET_GOTO(filterInitFromNode((SNode*)sStreamReaderCalcInfo->tsConditions,
186!
885
                                             (SFilterInfo**)&sStreamReaderCalcInfo->pFilterInfo,
886
                                             FLT_OPTION_NO_REWRITE | FLT_OPTION_SCALAR_MODE, NULL));
887
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
186✔
888
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
186✔
889
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
186!
890
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
186!
891

892
    handle->winRange.skey = pFirst->wstart;
186✔
893
    handle->winRange.ekey = pLast->wend;
186✔
894
    handle->winRangeValid = true;
186✔
895
    stDebug("%s withExternalWindow is true, skey:%" PRId64 ", ekey:%" PRId64, __func__, pFirst->wstart, pLast->wend);
186✔
896
  } else {
897
    calcTimeRange(node, req->pStRtFuncInfo, &handle->winRange, &handle->winRangeValid);
15,473✔
898
  }
899

900
end:
15,659✔
901
  taosArrayDestroy(funcVals);
15,659✔
902
  return code;
15,659✔
903
}
904

905
static int32_t createBlockForWalMeta(SSDataBlock** pBlock, bool isVTable) {
7,350✔
906
  int32_t code = 0;
7,350✔
907
  int32_t lino = 0;
7,350✔
908
  SArray* schemas = NULL;
7,350✔
909

910
  schemas = taosArrayInit(8, sizeof(SSchema));
7,350✔
911
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
7,355!
912

913
  int32_t index = 0;
7,355✔
914
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
7,355!
915
  if (!isVTable) {
7,355✔
916
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid
6,475!
917
  }
918
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
7,364!
919
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
7,364!
920
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
7,363!
921
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
7,360!
922
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // nrows
7,361!
923

924
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
7,361!
925

926
end:
7,368✔
927
  taosArrayDestroy(schemas);
7,368✔
928
  return code;
7,368✔
929
}
930

931
static int32_t createOptionsForLastTs(SStreamTriggerReaderTaskInnerOptions* options,
343✔
932
                                      SStreamTriggerReaderInfo*             sStreamReaderInfo) {
933
  int32_t code = 0;
343✔
934
  int32_t lino = 0;
343✔
935
  SArray* schemas = NULL;
343✔
936

937
  schemas = taosArrayInit(4, sizeof(SSchema));
343✔
938
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
343!
939
  STREAM_CHECK_RET_GOTO(
343!
940
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // last ts
941

942
  BUILD_OPTION(op, sStreamReaderInfo, -1, true, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, schemas, true,
343✔
943
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
944
  schemas = NULL;
343✔
945
  *options = op;
343✔
946

947
end:
343✔
948
  taosArrayDestroy(schemas);
343✔
949
  return code;
343✔
950
}
951

952
static int32_t createOptionsForFirstTs(SStreamTriggerReaderTaskInnerOptions* options,
328✔
953
                                       SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t ver) {
954
  int32_t code = 0;
328✔
955
  int32_t lino = 0;
328✔
956
  SArray* schemas = NULL;
328✔
957

958
  schemas = taosArrayInit(4, sizeof(SSchema));
328✔
959
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
328!
960
  STREAM_CHECK_RET_GOTO(
328!
961
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // first ts
962

963
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, TSDB_ORDER_ASC, start, INT64_MAX, schemas, true,
328✔
964
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
965
  schemas = NULL;
328✔
966

967
  *options = op;
328✔
968
end:
328✔
969
  taosArrayDestroy(schemas);
328✔
970
  return code;
328✔
971
}
972

973
static int32_t createOptionsForTsdbMeta(SStreamTriggerReaderTaskInnerOptions* options,
626✔
974
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t end,
975
                                        int64_t gid, int8_t order, int64_t ver, bool onlyTs) {
976
  int32_t code = 0;
626✔
977
  int32_t lino = 0;
626✔
978
  SArray* schemas = NULL;
626✔
979

980
  int32_t index = 1;
626✔
981
  schemas = taosArrayInit(8, sizeof(SSchema));
626✔
982
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
626!
983
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
626!
984
  if (!onlyTs){
626✔
985
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
313!
986
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
313!
987
    if (sStreamReaderInfo->uidList == NULL) {
313✔
988
      STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
101!
989
    }
990
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
313!
991
  }
992
  
993
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, start, end, schemas, true, (gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), gid,
626✔
994
               true, sStreamReaderInfo->uidList);
995
  schemas = NULL;
626✔
996
  *options = op;
626✔
997

998
end:
626✔
999
  taosArrayDestroy(schemas);
626✔
1000
  return code;
626✔
1001
}
1002

1003
static int taosCompareInt64Asc(const void* elem1, const void* elem2) {
57✔
1004
  int64_t* node1 = (int64_t*)elem1;
57✔
1005
  int64_t* node2 = (int64_t*)elem2;
57✔
1006

1007
  if (*node1 < *node2) {
57!
UNCOV
1008
    return -1;
×
1009
  }
1010

1011
  return *node1 > *node2;
57✔
1012
}
1013

1014
static int32_t getTableList(SArray** pList, int32_t* pNum, int64_t* suid, int32_t index,
89✔
1015
                            SStreamTriggerReaderInfo* sStreamReaderInfo) {
1016
  int32_t code = 0;
89✔
1017
  int32_t lino = 0;
89✔
1018

1019
  int32_t* start = taosArrayGet(sStreamReaderInfo->uidListIndex, index);
89✔
1020
  STREAM_CHECK_NULL_GOTO(start, terrno);
89!
1021
  int32_t  iStart = *start;
89✔
1022
  int32_t* end = taosArrayGet(sStreamReaderInfo->uidListIndex, index + 1);
89✔
1023
  STREAM_CHECK_NULL_GOTO(end, terrno);
89!
1024
  int32_t iEnd = *end;
89✔
1025
  *pList = taosArrayInit(iEnd - iStart, sizeof(STableKeyInfo));
89✔
1026
  STREAM_CHECK_NULL_GOTO(*pList, terrno);
89!
1027
  for (int32_t i = iStart; i < iEnd; ++i) {
268✔
1028
    int64_t*       uid = taosArrayGet(sStreamReaderInfo->uidList, i);
179✔
1029
    STableKeyInfo* info = taosArrayReserve(*pList, 1);
179✔
1030
    STREAM_CHECK_NULL_GOTO(info, terrno);
179!
1031
    *suid = uid[0];
179✔
1032
    info->uid = uid[1];
179✔
1033
  }
1034
  *pNum = iEnd - iStart;
89✔
1035
end:
89✔
1036
  STREAM_PRINT_LOG_END(code, lino);
89!
1037
  return code;
89✔
1038
}
1039

1040
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
581✔
1041
                                  SStreamReaderTaskInner* pTask) {
1042
  int32_t code = 0;
581✔
1043
  int32_t lino = 0;
581✔
1044

1045
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTask->pTableList), sizeof(STsInfo));
581✔
1046
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
581!
1047
  while (true) {
576✔
1048
    bool hasNext = false;
1,157✔
1049
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
1,157!
1050
    if (hasNext) {
1,157✔
1051
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
680✔
1052
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
680✔
1053
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
680!
1054
      if (pTask->options.order == TSDB_ORDER_ASC) {
680✔
1055
        tsInfo->ts = pTask->pResBlock->info.window.skey;
551✔
1056
      } else {
1057
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
129✔
1058
      }
1059
      tsInfo->gId = qStreamGetGroupId(pTask->pTableList, pTask->pResBlock->info.id.uid);
680✔
1060
      stDebug("vgId:%d %s get last ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
680✔
1061
              tsInfo->gId, tsRsp->ver);
1062
    }
1063

1064
    pTask->currentGroupIndex++;
1,157✔
1065
    if (pTask->currentGroupIndex >= qStreamGetTableListGroupNum(pTask->pTableList)) {
1,157✔
1066
      break;
582✔
1067
    }
1068
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTask));
574!
1069
  }
1070

1071
end:
582✔
1072
  return code;
582✔
1073
}
1074

1075
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
89✔
1076
                               SStreamReaderTaskInner* pTask, int64_t ver) {
1077
  int32_t code = 0;
89✔
1078
  int32_t lino = 0;
89✔
1079
  int64_t suid = 0;
89✔
1080
  SArray* pList = NULL;
89✔
1081
  int32_t pNum = 0;
89✔
1082

1083
  tsRsp->tsInfo = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(STsInfo));
89✔
1084
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
89!
1085
  for (int32_t i = 0; i < taosArrayGetSize(sStreamReaderInfo->uidListIndex) - 1; ++i) {
178✔
1086
    STREAM_CHECK_RET_GOTO(getTableList(&pList, &pNum, &suid, i, sStreamReaderInfo));
89!
1087

1088
    cleanupQueryTableDataCond(&pTask->cond);
89✔
1089
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas,
89!
1090
                                                        pTask->options.isSchema, pTask->options.twindows, suid, ver));
1091
    STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderOpen(
89!
1092
        pVnode, &pTask->cond, taosArrayGet(pList, 0), pNum, pTask->pResBlock, (void**)&pTask->pReader, pTask->idStr, NULL));
1093
    taosArrayDestroy(pList);
89✔
1094
    pList = NULL;
89✔
1095
    while (true) {
131✔
1096
      bool hasNext = false;
220✔
1097
      STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
220!
1098
      if (!hasNext) {
220✔
1099
        break;
89✔
1100
      }
1101
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
131✔
1102
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
131✔
1103
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
131!
1104
      if (pTask->options.order == TSDB_ORDER_ASC) {
131✔
1105
        tsInfo->ts = pTask->pResBlock->info.window.skey;
62✔
1106
      } else {
1107
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
69✔
1108
      }
1109
      tsInfo->gId = pTask->pResBlock->info.id.uid;
131✔
1110
      stDebug("vgId:%d %s get vtable last ts:%" PRId64 ", uid:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__,
131✔
1111
              tsInfo->ts, tsInfo->gId, tsRsp->ver);
1112
    }
1113
    pTask->api.tsdReader.tsdReaderClose(pTask->pReader);
89✔
1114
    pTask->pReader = NULL;
89✔
1115
  }
1116

1117
end:
89✔
1118
  taosArrayDestroy(pList);
89✔
1119
  return code;
89✔
1120
}
1121

1122
static void reSetUid(SStreamTriggerReaderTaskInnerOptions* options, int64_t suid, int64_t uid) {
256✔
1123
  if (suid != 0) options->suid = suid;
256✔
1124
  options->uid = uid;
256✔
1125
  if (options->suid != 0) {
256✔
1126
    options->tableType = TD_CHILD_TABLE;
253✔
1127
  } else {
1128
    options->tableType = TD_NORMAL_TABLE;
3✔
1129
  }
1130
}
256✔
1131

1132
static int32_t createOptionsForTsdbData(SVnode* pVnode, SStreamTriggerReaderTaskInnerOptions* options,
224✔
1133
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid, SArray* cols,
1134
                                        int8_t order,int64_t skey, int64_t ekey, int64_t ver) {
1135
  int32_t code = 0;
224✔
1136
  int32_t lino = 0;
224✔
1137
  SArray* schemas = NULL;
224✔
1138

1139
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
224!
1140
  STREAM_CHECK_RET_GOTO(shrinkScheams(cols, schemas));
224!
1141
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, skey, ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
224✔
1142
  *options = op;
224✔
1143

1144
end:
224✔
1145
  return code;
224✔
1146
}
1147

1148
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
65✔
1149
  int32_t code = 0;
65✔
1150
  int32_t lino = 0;
65✔
1151
  void*   buf = NULL;
65✔
1152
  size_t  size = 0;
65✔
1153
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
65!
1154
  void* pTask = sStreamReaderInfo->pTask;
65✔
1155

1156
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
65✔
1157

1158
  TSWAP(sStreamReaderInfo->uidList, req->setTableReq.uids);
65✔
1159
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidList, TSDB_CODE_INVALID_PARA);
65!
1160

1161
  taosArraySort(sStreamReaderInfo->uidList, taosCompareInt64Asc);
65✔
1162
  if (sStreamReaderInfo->uidListIndex != NULL) {
66!
1163
    taosArrayClear(sStreamReaderInfo->uidListIndex);
×
1164
  } else {
1165
    sStreamReaderInfo->uidListIndex = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(int32_t));
66✔
1166
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidListIndex, TSDB_CODE_INVALID_PARA);
66!
1167
  }
1168

1169
  if (sStreamReaderInfo->uidHash != NULL) {
66!
1170
    taosHashClear(sStreamReaderInfo->uidHash);
×
1171
  } else {
1172
    sStreamReaderInfo->uidHash = taosHashInit(taosArrayGetSize(sStreamReaderInfo->uidList),
66✔
1173
                                              taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1174
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHash, TSDB_CODE_INVALID_PARA);
66!
1175
  }
1176

1177
  int64_t suid = 0;
66✔
1178
  int32_t cnt = taosArrayGetSize(sStreamReaderInfo->uidList);
66✔
1179
  for (int32_t i = 0; i < cnt; ++i) {
177✔
1180
    int64_t* data = taosArrayGet(sStreamReaderInfo->uidList, i);
111✔
1181
    STREAM_CHECK_NULL_GOTO(data, terrno);
111!
1182
    if (*data == 0 || *data != suid) {
111✔
1183
      STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &i), terrno);
132!
1184
    }
1185
    suid = *data;
111✔
1186
    stDebug("vgId:%d %s suid:%" PRId64 ",uid:%" PRId64 ", index:%d", TD_VID(pVnode), __func__, suid, data[1], i);
111✔
1187
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->uidHash, data + 1, LONG_BYTES, &i, sizeof(int32_t)));
111!
1188
  }
1189
  STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &cnt), terrno);
132!
1190

1191
end:
66✔
1192
  STREAM_PRINT_LOG_END_WITHID(code, lino);
66!
1193
  SRpcMsg rsp = {
66✔
1194
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1195
  tmsgSendRsp(&rsp);
66✔
1196
  return code;
66✔
1197
}
1198

1199
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
339✔
1200
  int32_t                 code = 0;
339✔
1201
  int32_t                 lino = 0;
339✔
1202
  SArray*                 schemas = NULL;
339✔
1203
  SStreamReaderTaskInner* pTaskInner = NULL;
339✔
1204
  SStreamTsResponse       lastTsRsp = {0};
339✔
1205
  void*                   buf = NULL;
339✔
1206
  size_t                  size = 0;
339✔
1207

1208
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
339!
1209
  void* pTask = sStreamReaderInfo->pTask;
339✔
1210

1211
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
339✔
1212

1213
  SStreamTriggerReaderTaskInnerOptions options = {0};
339✔
1214
  STREAM_CHECK_RET_GOTO(createOptionsForLastTs(&options, sStreamReaderInfo));
339!
1215
  SStorageAPI api = {0};
343✔
1216
  initStorageAPI(&api);
343✔
1217
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
343!
1218

1219
  lastTsRsp.ver = pVnode->state.applied;
342✔
1220
  if (sStreamReaderInfo->uidList != NULL) {
342✔
1221
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner, -1));
66!
1222
  } else {
1223
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
276!
1224
  }
1225
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
343✔
1226
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
343!
1227

1228
end:
343✔
1229
  STREAM_PRINT_LOG_END_WITHID(code, lino);
343!
1230
  SRpcMsg rsp = {
343✔
1231
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1232
  tmsgSendRsp(&rsp);
343✔
1233
  taosArrayDestroy(lastTsRsp.tsInfo);
343✔
1234
  releaseStreamTask(&pTaskInner);
343✔
1235
  return code;
343✔
1236
}
1237

1238
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
328✔
1239
  int32_t                 code = 0;
328✔
1240
  int32_t                 lino = 0;
328✔
1241
  SStreamReaderTaskInner* pTaskInner = NULL;
328✔
1242
  SStreamTsResponse       firstTsRsp = {0};
328✔
1243
  void*                   buf = NULL;
328✔
1244
  size_t                  size = 0;
328✔
1245

1246
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
328!
1247
  void* pTask = sStreamReaderInfo->pTask;
328✔
1248
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
328✔
1249
  SStreamTriggerReaderTaskInnerOptions options = {0};
328✔
1250
  STREAM_CHECK_RET_GOTO(createOptionsForFirstTs(&options, sStreamReaderInfo, req->firstTsReq.startTime, req->firstTsReq.ver));
328!
1251
  SStorageAPI api = {0};
328✔
1252
  initStorageAPI(&api);
328✔
1253
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
328!
1254
  
1255
  firstTsRsp.ver = pVnode->state.applied;
328✔
1256
  if (sStreamReaderInfo->uidList != NULL) {
328✔
1257
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.ver));
23!
1258
  } else {
1259
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
305!
1260
  }
1261

1262
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
328✔
1263
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
328!
1264

1265
end:
328✔
1266
  STREAM_PRINT_LOG_END_WITHID(code, lino);
328!
1267
  SRpcMsg rsp = {
328✔
1268
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1269
  tmsgSendRsp(&rsp);
328✔
1270
  taosArrayDestroy(firstTsRsp.tsInfo);
328✔
1271
  releaseStreamTask(&pTaskInner);
328✔
1272
  return code;
328✔
1273
}
1274

1275
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
313✔
1276
  int32_t code = 0;
313✔
1277
  int32_t lino = 0;
313✔
1278
  void*   buf = NULL;
313✔
1279
  size_t  size = 0;
313✔
1280
  SStreamTriggerReaderTaskInnerOptions options = {0};
313✔
1281

1282
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
313!
1283
  void* pTask = sStreamReaderInfo->pTask;
313✔
1284
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
313✔
1285

1286
  SStreamReaderTaskInner* pTaskInner = NULL;
313✔
1287
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
313✔
1288

1289
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
313!
1290
    SStreamTriggerReaderTaskInnerOptions optionsTs = {0};
313✔
1291

1292
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&optionsTs, sStreamReaderInfo, req->tsdbMetaReq.startTime,
313!
1293
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, true));
1294
    SStorageAPI api = {0};
313✔
1295
    initStorageAPI(&api);
313✔
1296
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &optionsTs, &pTaskInner, NULL, sStreamReaderInfo->groupIdMap, &api));
313!
1297
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
313!
1298
    
1299
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&options, sStreamReaderInfo, req->tsdbMetaReq.startTime,
313!
1300
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, false));
1301
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(options.schemas, &pTaskInner->pResBlockDst));
313!
1302
  } else {
1303
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1304
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1305
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1306
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1307
  }
1308

1309
  blockDataCleanup(pTaskInner->pResBlockDst);
313✔
1310
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
313!
1311
  bool hasNext = true;
313✔
1312
  while (true) {
122✔
1313
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
435!
1314
    if (!hasNext) {
435✔
1315
      break;
313✔
1316
    }
1317
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
122✔
1318
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
122✔
1319

1320
    int32_t index = 0;
122✔
1321
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
122!
1322
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
122!
1323
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
122!
1324
    if (sStreamReaderInfo->uidList == NULL) {
122✔
1325
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
31!
1326
    }
1327
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
122!
1328

1329
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
122✔
1330
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1331
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1332
            pTaskInner->pResBlockDst->info.rows++;
122✔
1333
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
122!
1334
      break;
×
1335
    }
1336
  }
1337

1338
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
313✔
1339
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
313!
1340
  if (!hasNext) {
313!
1341
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
313!
1342
  }
1343

1344
end:
313✔
1345
  STREAM_PRINT_LOG_END_WITHID(code, lino);
313!
1346
  SRpcMsg rsp = {
313✔
1347
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1348
  tmsgSendRsp(&rsp);
313✔
1349
  taosArrayDestroy(options.schemas);
313✔
1350
  return code;
313✔
1351
}
1352

1353
static int32_t vnodeProcessStreamTsdbTsDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
32✔
1354
  int32_t                 code = 0;
32✔
1355
  int32_t                 lino = 0;
32✔
1356
  SStreamReaderTaskInner* pTaskInner = NULL;
32✔
1357
  void*                   buf = NULL;
32✔
1358
  size_t                  size = 0;
32✔
1359
  SSDataBlock*            pBlockRes = NULL;
32✔
1360

1361
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
32!
1362
  void* pTask = sStreamReaderInfo->pTask;
32✔
1363
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
32!
1364

1365
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
32✔
1366
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
1367
  reSetUid(&options, req->tsdbTsDataReq.suid, req->tsdbTsDataReq.uid);
32✔
1368
  SStorageAPI api = {0};
32✔
1369
  initStorageAPI(&api);
32✔
1370
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
32!
1371
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
32!
1372
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
32!
1373

1374
  while (1) {
32✔
1375
    bool hasNext = false;
64✔
1376
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
64!
1377
    if (!hasNext) {
64✔
1378
      break;
32✔
1379
    }
1380
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
32✔
1381

1382
    SSDataBlock* pBlock = NULL;
32✔
1383
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
32!
1384
    if (pBlock != NULL && pBlock->info.rows > 0) {
32!
1385
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &api, pBlock));
32!
1386
    }
1387
    
1388
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
32!
1389
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
32!
1390
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
32!
1391
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1392
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1393
  }
1394

1395
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
32✔
1396

1397
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
32!
1398
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
32!
1399

1400
end:
32✔
1401
  STREAM_PRINT_LOG_END_WITHID(code, lino);
32!
1402
  SRpcMsg rsp = {
32✔
1403
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1404
  tmsgSendRsp(&rsp);
32✔
1405
  blockDataDestroy(pBlockRes);
32✔
1406

1407
  releaseStreamTask(&pTaskInner);
32✔
1408
  return code;
32✔
1409
}
1410

1411
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
340✔
1412
  int32_t code = 0;
340✔
1413
  int32_t lino = 0;
340✔
1414
  void*   buf = NULL;
340✔
1415
  size_t  size = 0;
340✔
1416

1417
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
340!
1418
  SStreamReaderTaskInner* pTaskInner = NULL;
340✔
1419
  void* pTask = sStreamReaderInfo->pTask;
340✔
1420
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
340✔
1421
  
1422
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
340✔
1423

1424
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
340✔
1425
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, true, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
164✔
1426
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
1427
                 req->tsdbTriggerDataReq.gid, true, NULL);
1428
    SStorageAPI api = {0};
164✔
1429
    initStorageAPI(&api);
164✔
1430
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock,
164!
1431
                                           sStreamReaderInfo->groupIdMap, &api));
1432
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
164!
1433
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
164!
1434
  } else {
1435
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
176✔
1436
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
176!
1437
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
176✔
1438
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
176!
1439
  }
1440

1441
  blockDataCleanup(pTaskInner->pResBlockDst);
340✔
1442
  bool hasNext = true;
340✔
UNCOV
1443
  while (1) {
×
1444
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
340!
1445
    if (!hasNext) {
340✔
1446
      break;
164✔
1447
    }
1448
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
176✔
1449
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
176✔
1450

1451
    SSDataBlock* pBlock = NULL;
176✔
1452
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
176!
1453
    if (pBlock != NULL && pBlock->info.rows > 0) {
176!
1454
      STREAM_CHECK_RET_GOTO(
176!
1455
        processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &pTaskInner->api, pBlock));
1456
    }
1457
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
176!
1458
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
176!
1459
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
176✔
1460
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1461
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1462
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
176!
1463
      break;
176✔
1464
    }
1465
  }
1466

1467
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
340!
1468
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
340✔
1469
  if (!hasNext) {
340✔
1470
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
164!
1471
  }
1472

1473
end:
340✔
1474
  STREAM_PRINT_LOG_END_WITHID(code, lino);
340!
1475
  SRpcMsg rsp = {
340✔
1476
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1477
  tmsgSendRsp(&rsp);
340✔
1478

1479
  return code;
340✔
1480
}
1481

1482
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
28,368✔
1483
  int32_t code = 0;
28,368✔
1484
  int32_t lino = 0;
28,368✔
1485
  void*   buf = NULL;
28,368✔
1486
  size_t  size = 0;
28,368✔
1487
  SSDataBlock*            pBlockRes = NULL;
28,368✔
1488

1489
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
28,368!
1490
  void* pTask = sStreamReaderInfo->pTask;
28,368✔
1491
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, 
28,368✔
1492
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid);
1493

1494
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
28,368!
1495

1496
  SStreamReaderTaskInner* pTaskInner = NULL;
28,368✔
1497
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
28,368✔
1498

1499
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
28,367!
1500
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
28,367✔
1501
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
1502
    SStorageAPI api = {0};
28,367✔
1503
    initStorageAPI(&api);
28,367✔
1504
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
28,368✔
1505

1506
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
28,290!
1507
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
28,293!
1508
  } else {
UNCOV
1509
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
UNCOV
1510
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
UNCOV
1511
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1512
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1513
  }
1514

1515
  blockDataCleanup(pTaskInner->pResBlockDst);
28,293✔
1516
  bool hasNext = true;
28,293✔
1517
  while (1) {
2,571✔
1518
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
30,864!
1519
    if (!hasNext) {
30,864✔
1520
      break;
28,293✔
1521
    }
1522
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
2,571✔
1523

1524
    SSDataBlock* pBlock = NULL;
2,570✔
1525
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
2,570!
1526
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
2,570!
1527
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
2,570!
1528
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
2,571!
UNCOV
1529
      break;
×
1530
    }
1531
  }
1532

1533
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
28,293!
1534
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
28,290!
1535
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
28,290✔
1536
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
28,288!
1537
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
28,286✔
1538
  if (!hasNext) {
28,293!
1539
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
28,293!
1540
  }
1541

1542
end:
28,293✔
1543
  STREAM_PRINT_LOG_END_WITHID(code, lino);
28,368!
1544
  SRpcMsg rsp = {
28,368✔
1545
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1546
  tmsgSendRsp(&rsp);
28,368✔
1547
  blockDataDestroy(pBlockRes);
28,368✔
1548
  return code;
28,365✔
1549
}
1550

1551
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
224✔
1552
  int32_t code = 0;
224✔
1553
  int32_t lino = 0;
224✔
1554
  void*   buf = NULL;
224✔
1555
  size_t  size = 0;
224✔
1556

1557
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
224!
1558
  void* pTask = sStreamReaderInfo->pTask;
224✔
1559
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
224✔
1560

1561
  SStreamReaderTaskInner* pTaskInner = NULL;
224✔
1562
  int64_t key = req->tsdbDataReq.uid;
224✔
1563

1564
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
224!
1565
    SStreamTriggerReaderTaskInnerOptions options = {0};
224✔
1566

1567
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbData(pVnode, &options, sStreamReaderInfo, req->tsdbDataReq.uid,
224!
1568
                                                   req->tsdbDataReq.cids, req->tsdbDataReq.order, req->tsdbDataReq.skey,
1569
                                                   req->tsdbDataReq.ekey, req->tsdbDataReq.ver));
1570
    reSetUid(&options, req->tsdbDataReq.suid, req->tsdbDataReq.uid);
224✔
1571

1572
    SStorageAPI api = {0};
224✔
1573
    initStorageAPI(&api);
224✔
1574
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
224!
1575

1576
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
224✔
1577
    cleanupQueryTableDataCond(&pTaskInner->cond);
224✔
1578
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
224!
1579
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
1580
                                                        pTaskInner->options.suid, pTaskInner->options.ver));
1581
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
224!
1582
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
1583

1584
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
224!
1585
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
224!
1586
  } else {
1587
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1588
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1589
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1590
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1591
  }
1592

1593
  blockDataCleanup(pTaskInner->pResBlockDst);
224✔
1594
  bool hasNext = true;
224✔
1595
  while (1) {
224✔
1596
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
448!
1597
    if (!hasNext) {
448✔
1598
      break;
224✔
1599
    }
1600

1601
    SSDataBlock* pBlock = NULL;
224✔
1602
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
224!
1603
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
224!
1604
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
224!
1605
      break;
×
1606
    }
1607
  }
1608
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
224!
1609
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
224✔
1610
  if (!hasNext) {
224!
1611
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
224!
1612
  }
1613

1614
end:
224✔
1615
  STREAM_PRINT_LOG_END_WITHID(code, lino);
224!
1616
  SRpcMsg rsp = {
224✔
1617
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1618
  tmsgSendRsp(&rsp);
224✔
1619

1620
  return code;
224✔
1621
}
1622

1623
static int32_t vnodeProcessStreamWalMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
7,363✔
1624
  int32_t      code = 0;
7,363✔
1625
  int32_t      lino = 0;
7,363✔
1626
  void*        buf = NULL;
7,363✔
1627
  size_t       size = 0;
7,363✔
1628
  SSDataBlock* pBlock = NULL;
7,363✔
1629
  void*        pTableList = NULL;
7,363✔
1630
  SNodeList*   groupNew = NULL;
7,363✔
1631
  int64_t      lastVer = 0;
7,363✔
1632

1633
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
7,363✔
1634
  void* pTask = sStreamReaderInfo->pTask;
7,341✔
1635
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64 ",ctime:%" PRId64, TD_VID(pVnode), __func__,
7,341✔
1636
  req->walMetaReq.lastVer, req->walMetaReq.ctime);
1637

1638
  bool isVTable = sStreamReaderInfo->uidList != NULL;
7,341✔
1639
  if (sStreamReaderInfo->uidList == NULL) {
7,341✔
1640
    SStorageAPI api = {0};
6,468✔
1641
    initStorageAPI(&api);
6,468✔
1642
    STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
6,478!
1643
    STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
6,468!
1644
        pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1645
        groupNew, false, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1646
        &pTableList, sStreamReaderInfo->groupIdMap));
1647
  }
1648

1649
  STREAM_CHECK_RET_GOTO(createBlockForWalMeta(&pBlock, isVTable));
7,344!
1650
  STREAM_CHECK_RET_GOTO(scanWal(pVnode, isVTable ? sStreamReaderInfo->uidHash : pTableList, isVTable, pBlock,
7,367!
1651
                                req->walMetaReq.lastVer, sStreamReaderInfo->deleteReCalc,
1652
                                sStreamReaderInfo->deleteOutTbl, req->walMetaReq.ctime, &lastVer));
1653

1654
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
7,368✔
1655
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
7,368!
1656
  printDataBlock(pBlock, __func__, "");
7,356✔
1657

1658
end:
7,374✔
1659
  if (pBlock != NULL && pBlock->info.rows == 0) {
7,374✔
1660
    code = TSDB_CODE_STREAM_NO_DATA;
6,957✔
1661
    buf = rpcMallocCont(sizeof(int64_t));
6,957✔
1662
    *(int64_t *)buf = lastVer;
6,961✔
1663
    size = sizeof(int64_t);
6,961✔
1664
  }
1665
  SRpcMsg rsp = {
7,378✔
1666
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1667
  tmsgSendRsp(&rsp);
7,378✔
1668
  if (code == TSDB_CODE_STREAM_NO_DATA){
7,387✔
1669
    code = 0;
6,959✔
1670
  }
1671
  STREAM_PRINT_LOG_END_WITHID(code, lino);
7,387!
1672
  nodesDestroyList(groupNew);
7,387✔
1673
  blockDataDestroy(pBlock);
7,381✔
1674
  qStreamDestroyTableList(pTableList);
7,393✔
1675

1676
  return code;
7,388✔
1677
}
1678

1679
static int32_t vnodeProcessStreamWalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
7,632✔
1680
  int32_t      code = 0;
7,632✔
1681
  int32_t      lino = 0;
7,632✔
1682
  void*        buf = NULL;
7,632✔
1683
  size_t       size = 0;
7,632✔
1684
  SSDataBlock* pBlock = NULL;
7,632✔
1685

1686
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
7,632!
1687
  void* pTask = sStreamReaderInfo->pTask;
7,632✔
1688
  ST_TASK_DLOG("vgId:%d %s start, request type:%d skey:%" PRId64 ",ekey:%" PRId64 ",uid:%" PRId64 ",ver:%" PRId64, TD_VID(pVnode),
7,632✔
1689
  __func__, req->walDataReq.base.type, req->walDataReq.skey, req->walDataReq.ekey, req->walDataReq.uid, req->walDataReq.ver);
1690

1691
  STimeWindow window = {.skey = req->walDataReq.skey, .ekey = req->walDataReq.ekey};
7,633✔
1692
  if (req->walDataReq.base.type == STRIGGER_PULL_WAL_DATA){
7,633✔
1693
    STREAM_CHECK_RET_GOTO(processWalVerDataVTable(pVnode, req->walDataReq.cids, req->walDataReq.ver,
2,865!
1694
      req->walDataReq.uid, &window, &pBlock));
1695
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_CALC_DATA){
4,768✔
1696
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
2,603!
1697
      req->walDataReq.uid, &window, sStreamReaderInfo->calcResBlock, &pBlock));
1698
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TRIGGER_DATA) {
2,165✔
1699
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, true,
1,962!
1700
      req->walDataReq.uid, &window, sStreamReaderInfo->triggerResBlock, &pBlock));
1701
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TS_DATA){
203!
1702
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
203!
1703
      req->walDataReq.uid, &window, sStreamReaderInfo->tsBlock, &pBlock));
1704

1705
  }
1706
  
1707
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
7,627✔
1708
  printDataBlock(pBlock, __func__, "");
7,627✔
1709
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
7,627!
1710
end:
7,629✔
1711
  STREAM_PRINT_LOG_END_WITHID(code, lino);
7,629!
1712
  SRpcMsg rsp = {
7,629✔
1713
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1714
  tmsgSendRsp(&rsp);
7,629✔
1715

1716
  blockDataDestroy(pBlock);
7,633✔
1717
  return code;
7,632✔
1718
}
1719

1720
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
563✔
1721
  int32_t code = 0;
563✔
1722
  int32_t lino = 0;
563✔
1723
  void*   buf = NULL;
563✔
1724
  size_t  size = 0;
563✔
1725

1726
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
563!
1727
  void* pTask = sStreamReaderInfo->pTask;
563✔
1728
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
563✔
1729

1730
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
563✔
1731
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
563!
1732
  SStreamGroupInfo pGroupInfo = {0};
563✔
1733
  pGroupInfo.gInfo = *gInfo;
563✔
1734

1735
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
563✔
1736
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1737
  buf = rpcMallocCont(size);
563✔
1738
  STREAM_CHECK_NULL_GOTO(buf, terrno);
563!
1739
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
563✔
1740
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1741
end:
563✔
1742
  if (code != 0) {
563!
1743
    rpcFreeCont(buf);
×
1744
    buf = NULL;
×
1745
    size = 0;
×
1746
  }
1747
  STREAM_PRINT_LOG_END_WITHID(code, lino);
563!
1748
  SRpcMsg rsp = {
563✔
1749
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1750
  tmsgSendRsp(&rsp);
563✔
1751

1752
  return code;
563✔
1753
}
1754

1755
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
150✔
1756
  int32_t              code = 0;
150✔
1757
  int32_t              lino = 0;
150✔
1758
  void*                buf = NULL;
150✔
1759
  size_t               size = 0;
150✔
1760
  SStreamMsgVTableInfo vTableInfo = {0};
150✔
1761
  SMetaReader          metaReader = {0};
150✔
1762
  SNodeList*           groupNew = NULL;
150✔
1763
  void*                pTableList = NULL;
150✔
1764
  SStorageAPI api = {0};
150✔
1765
  initStorageAPI(&api);
150✔
1766

1767
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
150!
1768
  void* pTask = sStreamReaderInfo->pTask;
150✔
1769
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
150✔
1770

1771
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
150!
1772
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
151!
1773
      pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1774
      groupNew, true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1775
      &pTableList, sStreamReaderInfo->groupIdMap));
1776

1777
  SArray* cids = req->virTableInfoReq.cids;
151✔
1778
  STREAM_CHECK_NULL_GOTO(cids, terrno);
151!
1779

1780
  SArray* pTableListArray = qStreamGetTableArrayList(pTableList);
151✔
1781
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
151!
1782

1783
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
151✔
1784
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
151!
1785
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
151✔
1786

1787
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
393✔
1788
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
242✔
1789
    if (pKeyInfo == NULL) {
242!
1790
      continue;
×
1791
    }
1792
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
242✔
1793
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
242!
1794
    vTable->uid = pKeyInfo->uid;
242✔
1795
    vTable->gId = pKeyInfo->groupId;
242✔
1796

1797
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
242✔
1798
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
242!
1799
      vTable->cols.nCols = metaReader.me.colRef.nCols;
×
1800
      vTable->cols.version = metaReader.me.colRef.version;
×
1801
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
×
1802
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
×
1803
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
×
1804
      }
1805
    } else {
1806
      vTable->cols.nCols = taosArrayGetSize(cids);
242✔
1807
      vTable->cols.version = metaReader.me.colRef.version;
242✔
1808
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
242!
1809
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
1,352✔
1810
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
4,956✔
1811
          if (metaReader.me.colRef.pColRef[j].hasRef &&
4,715✔
1812
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
3,604✔
1813
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
869✔
1814
            break;
869✔
1815
          }
1816
        }
1817
      }
1818
    }
1819
    tDecoderClear(&metaReader.coder);
241✔
1820
  }
1821
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
151✔
1822
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
151!
1823

1824
end:
151✔
1825
  nodesDestroyList(groupNew);
151✔
1826
  qStreamDestroyTableList(pTableList);
151✔
1827
  tDestroySStreamMsgVTableInfo(&vTableInfo);
151✔
1828
  api.metaReaderFn.clearReader(&metaReader);
151✔
1829
  STREAM_PRINT_LOG_END_WITHID(code, lino);
151!
1830
  SRpcMsg rsp = {
151✔
1831
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1832
  tmsgSendRsp(&rsp);
151✔
1833
  return code;
151✔
1834
}
1835

1836
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
66✔
1837
  int32_t                   code = 0;
66✔
1838
  int32_t                   lino = 0;
66✔
1839
  void*                     buf = NULL;
66✔
1840
  size_t                    size = 0;
66✔
1841
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
66✔
1842
  SMetaReader               metaReader = {0};
66✔
1843
  int64_t streamId = req->base.streamId;
66✔
1844
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
66✔
1845

1846
  SStorageAPI api = {0};
66✔
1847
  initStorageAPI(&api);
66✔
1848

1849
  SArray* cols = req->origTableInfoReq.cols;
66✔
1850
  STREAM_CHECK_NULL_GOTO(cols, terrno);
66!
1851

1852
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
66✔
1853

1854
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
66!
1855

1856
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
66✔
1857
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
459✔
1858
    OTableInfo*    oInfo = taosArrayGet(cols, i);
393✔
1859
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
393✔
1860
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
393!
1861
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
393!
1862
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
393!
1863
    vTableInfo->uid = metaReader.me.uid;
392✔
1864
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
392✔
1865

1866
    SSchemaWrapper* sSchemaWrapper = NULL;
394✔
1867
    if (metaReader.me.type == TD_CHILD_TABLE) {
394✔
1868
      int64_t suid = metaReader.me.ctbEntry.suid;
391✔
1869
      vTableInfo->suid = suid;
391✔
1870
      tDecoderClear(&metaReader.coder);
391✔
1871
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
391!
1872
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
390✔
1873
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
3!
1874
      vTableInfo->suid = 0;
3✔
1875
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
3✔
1876
    } else {
1877
      stError("invalid table type:%d", metaReader.me.type);
×
1878
    }
1879

1880
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
1,657!
1881
      SSchema* s = sSchemaWrapper->pSchema + j;
1,657✔
1882
      if (strcmp(s->name, oInfo->refColName) == 0) {
1,657✔
1883
        vTableInfo->cid = s->colId;
393✔
1884
        break;
393✔
1885
      }
1886
    }
1887
    if (vTableInfo->cid == 0) {
393!
1888
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
1889
              oInfo->refTableName);
1890
    }
1891
    tDecoderClear(&metaReader.coder);
393✔
1892
  }
1893

1894
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
66!
1895

1896
end:
66✔
1897
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
66✔
1898
  api.metaReaderFn.clearReader(&metaReader);
66✔
1899
  STREAM_PRINT_LOG_END(code, lino);
66!
1900
  SRpcMsg rsp = {
66✔
1901
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1902
  tmsgSendRsp(&rsp);
66✔
1903
  return code;
66✔
1904
}
1905

1906
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
947✔
1907
  int32_t                   code = 0;
947✔
1908
  int32_t                   lino = 0;
947✔
1909
  void*                     buf = NULL;
947✔
1910
  size_t                    size = 0;
947✔
1911
  SSDataBlock* pBlock = NULL;
947✔
1912

1913
  SMetaReader               metaReader = {0};
947✔
1914
  SMetaReader               metaReaderStable = {0};
947✔
1915
  int64_t streamId = req->base.streamId;
947✔
1916
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
947✔
1917

1918
  SStorageAPI api = {0};
947✔
1919
  initStorageAPI(&api);
947✔
1920

1921
  SArray* cols = req->virTablePseudoColReq.cids;
947✔
1922
  STREAM_CHECK_NULL_GOTO(cols, terrno);
947!
1923

1924
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
947✔
1925
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
947!
1926

1927
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
947!
1928

1929
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
947!
1930
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
947!
UNCOV
1931
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 && *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
×
UNCOV
1932
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
×
UNCOV
1933
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
UNCOV
1934
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
×
UNCOV
1935
    pBlock->info.rows = 1;
×
UNCOV
1936
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
×
UNCOV
1937
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
×
UNCOV
1938
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
×
1939
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
947!
1940
    int64_t suid = metaReader.me.ctbEntry.suid;
947✔
1941
    api.metaReaderFn.readerReleaseLock(&metaReader);
947✔
1942
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
947✔
1943

1944
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
947!
1945
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
947✔
1946
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
6,904✔
1947
      col_id_t* id = taosArrayGet(cols, i);
5,957✔
1948
      STREAM_CHECK_NULL_GOTO(id, terrno);
5,957!
1949
      if (*id == -1) {
5,957✔
1950
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
947✔
1951
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
947!
1952
        continue;
947✔
1953
      }
1954
      size_t j = 0;
5,010✔
1955
      for (; j < sSchemaWrapper->nCols; j++) {
20,937!
1956
        SSchema* s = sSchemaWrapper->pSchema + j;
20,937✔
1957
        if (s->colId == *id) {
20,937✔
1958
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
5,010✔
1959
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
5,010!
1960
          break;
5,010✔
1961
        }
1962
      }
1963
      if (j == sSchemaWrapper->nCols) {
5,010!
1964
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
1965
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
1966
      }
1967
    }
1968
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
947!
1969
    pBlock->info.rows = 1;
947✔
1970
    
1971
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
6,904✔
1972
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
5,957✔
1973
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
5,957!
1974

1975
      if (pDst->info.colId == -1) {
5,957✔
1976
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
947!
1977
        continue;
947✔
1978
      }
1979
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
5,010!
1980
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
1981
        continue;
×
1982
      }
1983

1984
      STagVal val = {0};
5,010✔
1985
      val.cid = pDst->info.colId;
5,010✔
1986
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
5,010✔
1987

1988
      char* data = NULL;
5,010✔
1989
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
5,010!
1990
        data = tTagValToData((const STagVal*)p, false);
4,411✔
1991
      } else {
1992
        data = (char*)p;
599✔
1993
      }
1994

1995
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
5,010!
1996
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
1997

1998
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
5,010!
1999
          (data != NULL)) {
2000
        taosMemoryFree(data);
3,399!
2001
      }
2002
    }
2003
  } else {
2004
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
2005
    code = TSDB_CODE_INVALID_PARA;
×
2006
    goto end;
×
2007
  }
2008
  
2009
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
947✔
2010
  printDataBlock(pBlock, __func__, "");
947✔
2011
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
947!
2012

2013
end:
947✔
2014
  api.metaReaderFn.clearReader(&metaReaderStable);
947✔
2015
  api.metaReaderFn.clearReader(&metaReader);
947✔
2016
  STREAM_PRINT_LOG_END(code, lino);
947!
2017
  SRpcMsg rsp = {
947✔
2018
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2019
  tmsgSendRsp(&rsp);
947✔
2020
  blockDataDestroy(pBlock);
947✔
2021
  return code;
947✔
2022
}
2023

2024
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
15,776✔
2025
  int32_t            code = 0;
15,776✔
2026
  int32_t            lino = 0;
15,776✔
2027
  void*              buf = NULL;
15,776✔
2028
  size_t             size = 0;
15,776✔
2029
  void*              taskAddr = NULL;
15,776✔
2030
  SArray*            pResList = NULL;
15,776✔
2031

2032
  SResFetchReq req = {0};
15,776✔
2033
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
15,776!
2034
                              TSDB_CODE_QRY_INVALID_INPUT);
2035
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
15,776✔
2036
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
15,776!
2037

2038
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
15,776!
2039
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
15,776✔
2040
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
15,776!
2041
  void* pTask = sStreamReaderCalcInfo->pTask;
15,776✔
2042
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
15,776✔
2043
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
2044

2045
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
15,776!
2046
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
15,683✔
2047
    int64_t uid = 0;
15,683✔
2048
    if (req.dynTbname) {
15,683✔
2049
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
6✔
2050
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
6!
2051
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
6✔
2052
        if (pValue != NULL && pValue->isTbname) {
6!
2053
          uid = pValue->uid;
6✔
2054
          break;
6✔
2055
        }
2056
      }
2057
    }
2058
    
2059
    SReadHandle handle = {0};
15,683✔
2060
    handle.vnode = pVnode;
15,683✔
2061
    handle.uid = uid;
15,683✔
2062

2063
    initStorageAPI(&handle.api);
15,683✔
2064
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
15,683✔
2065
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
15,326!
2066
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
15,683✔
2067
      if (node != NULL) {
15,683✔
2068
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle));
15,659!
2069
      }
2070
    }
2071

2072
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
15,683✔
2073
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
15,683✔
2074

2075
    // if (sStreamReaderCalcInfo->pTaskInfo == NULL) {
2076
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
15,683!
2077
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
2078
                                                    req.taskId));
2079
    // } else {
2080
    // STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, handle.winRange));
2081
    // }
2082

2083
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
15,683!
2084
  }
2085

2086
  if (req.pOpParam != NULL) {
15,776!
UNCOV
2087
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
×
2088
  }
2089
  
2090
  pResList = taosArrayInit(4, POINTER_BYTES);
15,776✔
2091
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
15,776!
2092
  uint64_t ts = 0;
15,776✔
2093
  bool     hasNext = false;
15,776✔
2094
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
15,776!
2095

2096
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
25,735✔
2097
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
9,959✔
2098
    if (pBlock == NULL) continue;
9,959!
2099
    printDataBlock(pBlock, __func__, "fetch");
9,959✔
2100
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
9,959✔
2101
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo));
360!
2102
      printDataBlock(pBlock, __func__, "fetch filter");
360✔
2103
    }
2104
  }
2105

2106
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
15,776!
2107
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
15,776✔
2108

2109
end:
16✔
2110
  taosArrayDestroy(pResList);
15,776✔
2111
  streamReleaseTask(taskAddr);
15,776✔
2112

2113
  STREAM_PRINT_LOG_END(code, lino);
15,776!
2114
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
15,776✔
2115
  tmsgSendRsp(&rsp);
15,776✔
2116
  tDestroySResFetchReq(&req);
15,776✔
2117
  return code;
15,776✔
2118
}
2119

2120
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
62,685✔
2121
  int32_t                   code = 0;
62,685✔
2122
  int32_t                   lino = 0;
62,685✔
2123
  SSTriggerPullRequestUnion req = {0};
62,685✔
2124
  void*                     taskAddr = NULL;
62,685✔
2125

2126
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
62,685✔
2127
  if (!syncIsReadyForRead(pVnode->sync)) {
62,688✔
2128
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
163✔
2129
    return 0;
168✔
2130
  }
2131

2132
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
62,539✔
2133
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
15,776✔
2134
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
46,763✔
2135
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
46,755✔
2136
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
46,755✔
2137
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
46,755!
2138
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
46,753✔
2139
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
2140
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
46,757✔
2141
    switch (req.base.type) {
46,748!
2142
      case STRIGGER_PULL_SET_TABLE:
66✔
2143
        code = vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo);
66✔
2144
        break;
66✔
2145
      case STRIGGER_PULL_LAST_TS:
343✔
2146
        code = vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
343✔
2147
        break;
341✔
2148
      case STRIGGER_PULL_FIRST_TS:
328✔
2149
        code = vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
328✔
2150
        break;
328✔
2151
      case STRIGGER_PULL_TSDB_META:
313✔
2152
      case STRIGGER_PULL_TSDB_META_NEXT:
2153
        code = vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
313✔
2154
        break;
313✔
2155
      case STRIGGER_PULL_TSDB_TS_DATA:
32✔
2156
        code = vnodeProcessStreamTsdbTsDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
32✔
2157
        break;
32✔
2158
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
340✔
2159
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
2160
        code = vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
340✔
2161
        break;
340✔
2162
      case STRIGGER_PULL_TSDB_CALC_DATA:
28,368✔
2163
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
2164
        code = vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
28,368✔
2165
        break;
28,363✔
2166
      case STRIGGER_PULL_TSDB_DATA:
224✔
2167
      case STRIGGER_PULL_TSDB_DATA_NEXT:
2168
        code = vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
224✔
2169
        break;
224✔
2170
      case STRIGGER_PULL_WAL_META:
7,375✔
2171
        code = vnodeProcessStreamWalMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
7,375✔
2172
        break;
7,381✔
2173
      case STRIGGER_PULL_WAL_TS_DATA:
7,632✔
2174
      case STRIGGER_PULL_WAL_TRIGGER_DATA:
2175
      case STRIGGER_PULL_WAL_CALC_DATA:
2176
      case STRIGGER_PULL_WAL_DATA:
2177
        code = vnodeProcessStreamWalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
7,632✔
2178
        break;
7,634✔
2179
      case STRIGGER_PULL_GROUP_COL_VALUE:
563✔
2180
        code = vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo);
563✔
2181
        break;
563✔
2182
      case STRIGGER_PULL_VTABLE_INFO:
151✔
2183
        code = vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo);
151✔
2184
        break;
151✔
2185
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
947✔
2186
        code = vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req);
947✔
2187
        break;
947✔
2188
      case STRIGGER_PULL_OTABLE_INFO:
66✔
2189
        code = vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req);
66✔
2190
        break;
66✔
2191
      default:
×
2192
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
2193
        code = TSDB_CODE_APP_ERROR;
×
2194
        break;
×
2195
    }
2196
  } else {
2197
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
8!
2198
    code = TSDB_CODE_APP_ERROR;
×
2199
  }
2200
end:
46,749✔
2201

2202
  streamReleaseTask(taskAddr);
46,749✔
2203

2204
  tDestroySTriggerPullRequest(&req);
46,761✔
2205
  STREAM_PRINT_LOG_END(code, lino);
46,754!
2206
  return code;
46,761✔
2207
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc