• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.45
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "scalar.h"
17
#include "streamReader.h"
18
#include "tdatablock.h"
19
#include "tdb.h"
20
#include "tencode.h"
21
#include "tglobal.h"
22
#include "tmsg.h"
23
#include "vnd.h"
24
#include "vnode.h"
25
#include "vnodeInt.h"
26

27
#define BUILD_OPTION(options, sStreamInfo, _ver, _groupSort, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
28
                     _gid, _initReader, _uidList)                                                                        \
29
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_uidList == NULL ? sStreamInfo->suid : 0),              \
30
                                                  .uid = sStreamInfo->uid,                                         \
31
                                                  .ver = _ver,                                                     \
32
                                                  .tableType = sStreamInfo->tableType,                             \
33
                                                  .groupSort = _groupSort,                                         \
34
                                                  .order = _order,                                                 \
35
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
36
                                                  .pTagCond = sStreamInfo->pTagCond,                               \
37
                                                  .pTagIndexCond = sStreamInfo->pTagIndexCond,                     \
38
                                                  .pConditions = sStreamInfo->pConditions,                         \
39
                                                  .partitionCols = sStreamInfo->partitionCols,                     \
40
                                                  .schemas = _schemas,                                             \
41
                                                  .isSchema = _isSchema,                                           \
42
                                                  .scanMode = _scanMode,                                           \
43
                                                  .gid = _gid,                                                     \
44
                                                  .initReader = _initReader,                                       \
45
                                                  .uidList = _uidList};
46

47
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
29,293✔
48

49
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
50

51
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
34,582✔
52
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
34,582✔
53
  if (pSrc == NULL) {
34,541!
54
    return terrno;
×
55
  }
56

57
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
34,549✔
58
  return 0;
34,549✔
59
}
60

61
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
34,376✔
62
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
34,376✔
63
  if (code != TSDB_CODE_SUCCESS) {
34,369!
64
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
65
  }
66

67
  return code;
34,369✔
68
}
69

70
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
3,205✔
71
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
3,205✔
72
}
73

74
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
69✔
75
  int32_t code = 0;
69✔
76
  int32_t lino = 0;
69✔
77
  void*   buf = NULL;
69✔
78
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
69✔
79
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
69!
80
  buf = rpcMallocCont(len);
69✔
81
  STREAM_CHECK_NULL_GOTO(buf, terrno);
69!
82
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
69✔
83
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
69!
84
  *data = buf;
69✔
85
  *size = len;
69✔
86
  buf = NULL;
69✔
87
end:
69✔
88
  rpcFreeCont(buf);
69✔
89
  return code;
69✔
90
}
91

92
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
196✔
93
  int32_t code = 0;
196✔
94
  int32_t lino = 0;
196✔
95
  void*   buf = NULL;
196✔
96
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
196✔
97
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
196!
98
  buf = rpcMallocCont(len);
196✔
99
  STREAM_CHECK_NULL_GOTO(buf, terrno);
196!
100
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
196✔
101
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
196!
102
  *data = buf;
196✔
103
  *size = len;
196✔
104
  buf = NULL;
196✔
105
end:
196✔
106
  rpcFreeCont(buf);
196✔
107
  return code;
196✔
108
}
109

110
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
692✔
111
  int32_t code = 0;
692✔
112
  int32_t lino = 0;
692✔
113
  void*   buf = NULL;
692✔
114
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
692✔
115
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
692!
116
  buf = rpcMallocCont(len);
692✔
117
  STREAM_CHECK_NULL_GOTO(buf, terrno);
692!
118
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
692✔
119
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
692!
120
  *data = buf;
692✔
121
  *size = len;
692✔
122
  buf = NULL;
692✔
123
end:
692✔
124
  rpcFreeCont(buf);
692✔
125
  return code;
692✔
126
}
127

128

129
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
47,521✔
130
  int32_t code = 0;
47,521✔
131
  int32_t lino = 0;
47,521✔
132
  void*   buf = NULL;
47,521✔
133
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
47,521!
134
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
12,222✔
135
  buf = rpcMallocCont(dataEncodeSize);
12,219✔
136
  STREAM_CHECK_NULL_GOTO(buf, terrno);
12,222!
137
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
12,222✔
138
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
12,217!
139
  *data = buf;
12,217✔
140
  *size = dataEncodeSize;
12,217✔
141
  buf = NULL;
12,217✔
142
end:
47,516✔
143
  rpcFreeCont(buf);
47,516✔
144
  return code;
47,514✔
145
}
146

147
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
587✔
148
  int32_t        pNum = 1;
587✔
149
  STableKeyInfo* pList = NULL;
587✔
150
  int32_t        code = 0;
587✔
151
  int32_t        lino = 0;
587✔
152
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
587!
153
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
587!
154

155
  cleanupQueryTableDataCond(&pTask->cond);
587✔
156
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
587!
157
                                                      pTask->options.twindows, pTask->options.suid, pTask->options.ver));
158
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
587!
159

160
end:
587✔
161
  STREAM_PRINT_LOG_END(code, lino);
587!
162
  return code;
587✔
163
}
164

165
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t gid, bool isVTable, int64_t uid,
5,339✔
166
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
167
  int32_t code = 0;
5,339✔
168
  int32_t lino = 0;
5,339✔
169
  int32_t index = 0;
5,339✔
170
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
5,339!
171
  if (!isVTable) {
5,319✔
172
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &gid));
3,216!
173
  }
174
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
5,272!
175
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
5,238!
176
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
5,222!
177
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
5,222!
178
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
5,213!
179

180
end:
5,223✔
181
  STREAM_PRINT_LOG_END(code, lino)
5,223!
182
  return code;
5,229✔
183
}
184

185
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
5,326✔
186
  pTSchema->numOfCols = 1;
5,326✔
187
  pTSchema->version = ver;
5,326✔
188
  pTSchema->columns[0].colId = colId;
5,326✔
189
  pTSchema->columns[0].type = type;
5,326✔
190
  pTSchema->columns[0].bytes = bytes;
5,326✔
191
}
5,326✔
192

193
int32_t retrieveWalMetaData(SSubmitTbData* pSubmitTbData, void* pTableList, bool isVTable, SSDataBlock* pBlock,
38,607✔
194
                            int64_t ver) {
195
  int32_t code = 0;
38,607✔
196
  int32_t lino = 0;
38,607✔
197

198
  int64_t   uid = pSubmitTbData->uid;
38,607✔
199
  int32_t   numOfRows = 0;
38,607✔
200
  int64_t   skey = 0;
38,607✔
201
  int64_t   ekey = 0;
38,607✔
202
  STSchema* pTSchema = NULL;
38,607✔
203
  uint64_t  gid = 0;
38,607✔
204
  if (isVTable) {
38,607✔
205
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS);
7,211✔
206
  } else {
207
    STREAM_CHECK_CONDITION_GOTO(!qStreamUidInTableList(pTableList, uid), TDB_CODE_SUCCESS);
31,396✔
208
    gid = qStreamGetGroupId(pTableList, uid);
3,169✔
209
  }
210

211
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
5,309✔
212
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
14✔
213
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
14!
214
    numOfRows = pCol->nVal;
14✔
215

216
    SColVal colVal = {0};
14✔
217
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, 0, &colVal));
14!
218
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
14✔
219

220
    STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, numOfRows - 1, &colVal));
14!
221
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
14✔
222
  } else {
223
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
5,295✔
224
    SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, 0);
5,279✔
225
    STREAM_CHECK_NULL_GOTO(pRow, terrno);
5,262!
226
    SColVal colVal = {0};
5,262✔
227
    pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn));
5,262!
228
    STREAM_CHECK_NULL_GOTO(pTSchema, terrno);
5,340!
229
    buildTSchema(pTSchema, pSubmitTbData->sver, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES);
5,340✔
230
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
5,315!
231
    skey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
5,297✔
232
    pRow = taosArrayGetP(pSubmitTbData->aRowP, numOfRows - 1);
5,297✔
233
    STREAM_CHECK_RET_GOTO(tRowGet(pRow, pTSchema, 0, &colVal));
5,268!
234
    ekey = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
5,259✔
235
  }
236

237
  STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_SUBMIT_DATA, gid, isVTable, uid, skey, ekey, ver, numOfRows));
5,273!
238
  pBlock->info.rows++;
5,163✔
239
  stDebug("stream reader scan submit data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64 ", gid %" PRIu64
5,163✔
240
          ", rows:%d",
241
          uid, skey, ekey, gid, numOfRows);
242

243
end:
3,138✔
244
  taosMemoryFree(pTSchema);
38,725!
245
  STREAM_PRINT_LOG_END(code, lino)
38,869!
246
  return code;
38,875✔
247
}
248

249
int32_t retrieveWalData(SVnode* pVnode, SSubmitTbData* pSubmitTbData, SSDataBlock* pBlock, STimeWindow* window) {
8,664✔
250
  stDebug("stream reader retrieve data block %p", pSubmitTbData);
8,664✔
251
  int32_t code = 0;
8,664✔
252
  int32_t lino = 0;
8,664✔
253

254
  int32_t ver = pSubmitTbData->sver;
8,664✔
255
  int64_t uid = pSubmitTbData->uid;
8,664✔
256
  int32_t numOfRows = 0;
8,664✔
257

258
  STSchema* schemas = metaGetTbTSchema(pVnode->pMeta, uid, ver, 1);
8,664✔
259
  STREAM_CHECK_NULL_GOTO(schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
8,685!
260
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
8,685✔
261
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
249✔
262
    STREAM_CHECK_NULL_GOTO(pCol, terrno);
249!
263
    int32_t rowStart = 0;
249✔
264
    int32_t rowEnd = pCol->nVal;
249✔
265
    if (window != NULL) {
249!
266
      SColVal colVal = {0};
249✔
267
      rowStart = -1;
249✔
268
      rowEnd = -1;
249✔
269
      for (int32_t k = 0; k < pCol->nVal; k++) {
4,482✔
270
        STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
4,233!
271
        int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
4,233✔
272
        if (ts >= window->skey && rowStart == -1) {
4,233!
273
          rowStart = k;
249✔
274
        }
275
        if (ts > window->ekey && rowEnd == -1) {
4,233!
276
          rowEnd = k;
×
277
        }
278
      }
279
      STREAM_CHECK_CONDITION_GOTO(rowStart == -1 || rowStart == rowEnd, TDB_CODE_SUCCESS);
249!
280

281
      if (rowStart != -1 && rowEnd == -1) {
249!
282
        rowEnd = pCol->nVal;
249✔
283
      }
284
    }
285
    numOfRows = rowEnd - rowStart;
249✔
286
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
249!
287
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
1,066✔
288
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
817✔
289
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
817!
290
      if (i >= taosArrayGetSize(pSubmitTbData->aCol)) {
817!
291
        break;
×
292
      }
293
      for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aCol); j++) {
7,353✔
294
        SColData* pCol = taosArrayGet(pSubmitTbData->aCol, j);
6,536✔
295
        STREAM_CHECK_NULL_GOTO(pCol, terrno);
6,536!
296
        if (pCol->cid == pColData->info.colId) {
6,536✔
297
          for (int32_t k = rowStart; k < rowEnd; k++) {
14,706✔
298
            SColVal colVal = {0};
13,889✔
299
            STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
13,889!
300
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
13,889!
301
                                                !COL_VAL_IS_VALUE(&colVal)));
302
          }
303
        }
304
      }
305
    }
306
  } else {
307
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, taosArrayGetSize(pSubmitTbData->aRowP)));
8,436!
308
    for (int32_t j = 0; j < taosArrayGetSize(pSubmitTbData->aRowP); j++) {
525,418✔
309
      SRow* pRow = taosArrayGetP(pSubmitTbData->aRowP, j);
516,985✔
310
      STREAM_CHECK_NULL_GOTO(pRow, terrno);
516,992!
311
      SColVal colVal = {0};
516,992✔
312
      STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, 0, &colVal));
516,992!
313
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
516,990✔
314
      if (window != NULL && (ts < window->skey || ts > window->ekey)) {
516,990✔
315
        continue;
44✔
316
      }
317
      for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
1,559,680✔
318
        if (i >= schemas->numOfCols) {
1,556,675✔
319
          break;
513,917✔
320
        }
321
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
1,042,758✔
322
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
1,042,755✔
323
        SColVal colVal = {0};
1,042,751✔
324
        int32_t sourceIdx = 0;
1,042,751✔
325
        while (1) {
326
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schemas, sourceIdx, &colVal));
1,604,258!
327
          if (colVal.cid < pColData->info.colId) {
1,604,234✔
328
            sourceIdx++;
561,507✔
329
            continue;
561,507✔
330
          } else {
331
            break;
1,042,727✔
332
          }
333
        }
334
        if (colVal.cid == pColData->info.colId && COL_VAL_IS_VALUE(&colVal)) {
1,042,727✔
335
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
1,037,205!
336
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, numOfRows, colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
135!
337
          } else {
338
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
1,037,070!
339
          }
340
        } else {
341
          colDataSetNULL(pColData, numOfRows);
5,522!
342
        }
343
      }
344
      numOfRows++;
516,927✔
345
    }
346
  }
347

348
  pBlock->info.rows = numOfRows;
8,694✔
349

350
end:
8,694✔
351
  taosMemoryFree(schemas);
8,694!
352
  STREAM_PRINT_LOG_END(code, lino)
8,691!
353
  return code;
8,695✔
354
}
355

356
static int32_t buildDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, SDeleteRes* req, int64_t uid, int64_t ver){
134✔
357
  int32_t    code = 0;
134✔
358
  int32_t    lino = 0;
134✔
359
  uint64_t   gid = 0;
134✔
360
  stDebug("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
134✔
361
  if (isVTable) {
134✔
362
    STREAM_CHECK_CONDITION_GOTO(taosHashGet(pTableList, &uid, sizeof(uid)) == NULL, TDB_CODE_SUCCESS)
36✔
363
  } else {
364
    gid = qStreamGetGroupId(pTableList, uid);
98✔
365
    STREAM_CHECK_CONDITION_GOTO(gid == -1, TDB_CODE_SUCCESS);
98✔
366
  }
367
  STREAM_CHECK_RET_GOTO(
67!
368
      buildWalMetaBlock(pBlock, WAL_DELETE_DATA, gid, isVTable, uid, req->skey, req->ekey, ver, 1));
369
  pBlock->info.rows++;
67✔
370

371
  stDebug("stream reader scan delete end data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, uid, req->skey, req->ekey);
67✔
372
end:
48✔
373
  return code;
134✔
374
}
375

376
static int32_t scanDeleteData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
134✔
377
                              int64_t ver) {
378
  int32_t    code = 0;
134✔
379
  int32_t    lino = 0;
134✔
380
  SDecoder   decoder = {0};
134✔
381
  SDeleteRes req = {0};
134✔
382
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
134✔
383
  tDecoderInit(&decoder, data, len);
134✔
384
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
134!
385
  
386
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
268✔
387
    uint64_t* uid = taosArrayGet(req.uidList, i);
134✔
388
    STREAM_CHECK_NULL_GOTO(uid, terrno);
134!
389
    STREAM_CHECK_RET_GOTO(buildDeleteData(pTableList, isVTable, pBlock, &req, *uid, ver));
134!
390
  }
391

392
end:
134✔
393
  taosArrayDestroy(req.uidList);
134✔
394
  tDecoderClear(&decoder);
134✔
395
  return code;
134✔
396
}
397

398
static int32_t scanDropTable(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
×
399
                             int64_t ver) {
400
  int32_t  code = 0;
×
401
  int32_t  lino = 0;
×
402
  SDecoder decoder = {0};
×
403

404
  SVDropTbBatchReq req = {0};
×
405
  tDecoderInit(&decoder, data, len);
×
406
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
×
407
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
408
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
409
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
×
410
    uint64_t gid = 0;
×
411
    if (isVTable) {
×
412
      if (taosHashGet(pTableList, &pDropTbReq->uid, sizeof(pDropTbReq->uid)) == NULL) {
×
413
        continue;
×
414
      }
415
    } else {
416
      gid = qStreamGetGroupId(pTableList, pDropTbReq->uid);
×
417
      if (gid == -1) continue;
×
418
    }
419

420
    STREAM_CHECK_RET_GOTO(buildWalMetaBlock(pBlock, WAL_DELETE_TABLE, gid, isVTable, pDropTbReq->uid, 0, 0, ver, 1));
×
421
    pBlock->info.rows++;
×
422
    stDebug("stream reader scan drop :uid %" PRIu64 ", gid %" PRIu64, pDropTbReq->uid, gid);
×
423
  }
424

425
end:
×
426
  tDecoderClear(&decoder);
×
427
  return code;
×
428
}
429

430
static int32_t scanSubmitData(void* pTableList, bool isVTable, SSDataBlock* pBlock, void* data, int32_t len,
38,756✔
431
                              int64_t ver) {
432
  int32_t  code = 0;
38,756✔
433
  int32_t  lino = 0;
38,756✔
434
  SDecoder decoder = {0};
38,756✔
435

436
  SSubmitReq2 submit = {0};
38,756✔
437
  tDecoderInit(&decoder, data, len);
38,756✔
438
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
38,716!
439

440
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
38,673✔
441

442
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfBlocks));
38,658!
443

444
  int32_t nextBlk = -1;
38,634✔
445
  while (++nextBlk < numOfBlocks) {
77,498✔
446
    stDebug("stream reader scan submit, next data block %d/%d", nextBlk, numOfBlocks);
38,632✔
447
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
38,632✔
448
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
38,613!
449
    STREAM_CHECK_RET_GOTO(retrieveWalMetaData(pSubmitTbData, pTableList, isVTable, pBlock, ver));
38,613!
450
  }
451
end:
38,866✔
452
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
38,866✔
453
  tDecoderClear(&decoder);
38,814✔
454
  return code;
38,877✔
455
}
456

457
static int32_t scanWal(SVnode* pVnode, void* pTableList, bool isVTable, SSDataBlock* pBlock, int64_t lastVer,
7,884✔
458
                       int8_t deleteData, int8_t deleteTb, int64_t ctime, int64_t* retVer) {
459
  int32_t code = 0;
7,884✔
460
  int32_t lino = 0;
7,884✔
461

462
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
7,884✔
463
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
7,886!
464
  *retVer = walGetAppliedVer(pWalReader->pWal);
7,886✔
465
  STREAM_CHECK_CONDITION_GOTO(walReaderSeekVer(pWalReader, lastVer + 1) != 0, TSDB_CODE_SUCCESS);
7,892✔
466

467
  while (1) {
39,721✔
468
    *retVer = walGetAppliedVer(pWalReader->pWal);
41,258✔
469
    STREAM_CHECK_CONDITION_GOTO(walNextValidMsg(pWalReader, true) < 0, TSDB_CODE_SUCCESS);
41,237✔
470

471
    SWalCont* wCont = &pWalReader->pHead->head;
39,654✔
472
    if (wCont->ingestTs / 1000 > ctime) break;
39,654!
473
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
39,654✔
474
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
39,654✔
475
    int64_t ver = wCont->version;
39,654✔
476

477
    stDebug("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
39,654✔
478
      TD_VID(pVnode), ver, wCont->msgType, deleteData, deleteTb);
479
    if (wCont->msgType == TDMT_VND_DELETE && deleteData != 0) {
39,648✔
480
      STREAM_CHECK_RET_GOTO(scanDeleteData(pTableList, isVTable, pBlock, data, len, ver));
134!
481
    } else if (wCont->msgType == TDMT_VND_DROP_TABLE && deleteTb != 0) {
39,514!
482
      STREAM_CHECK_RET_GOTO(scanDropTable(pTableList, isVTable, pBlock, data, len, ver));
×
483
    } else if (wCont->msgType == TDMT_VND_SUBMIT) {
39,514✔
484
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
38,793✔
485
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
38,793✔
486
      STREAM_CHECK_RET_GOTO(scanSubmitData(pTableList, isVTable, pBlock, data, len, ver));
38,793!
487
    }
488

489
    if (pBlock->info.rows >= STREAM_RETURN_ROWS_NUM) {
39,721!
490
      break;
×
491
    }
492
  }
493

494
end:
7,891✔
495
  walCloseReader(pWalReader);
7,891✔
496
  STREAM_PRINT_LOG_END(code, lino);
7,894!
497
  return code;
7,898✔
498
}
499

500
int32_t scanWalOneVer(SVnode* pVnode, SSDataBlock* pBlock, SSDataBlock* pBlockRet,
8,691✔
501
                      int64_t ver, int64_t uid, STimeWindow* window) {
502
  int32_t     code = 0;
8,691✔
503
  int32_t     lino = 0;
8,691✔
504
  SSubmitReq2 submit = {0};
8,691✔
505
  SDecoder    decoder = {0};
8,691✔
506

507
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
8,691✔
508
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
8,689!
509

510
  STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, ver));
8,689!
511
  STREAM_CHECK_CONDITION_GOTO(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT, TSDB_CODE_STREAM_WAL_VER_NOT_DATA);
8,675!
512
  STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
8,675!
513
  void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
8,689✔
514
  int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
8,689✔
515

516
  int32_t nextBlk = -1;
8,689✔
517
  tDecoderInit(&decoder, pBody, bodyLen);
8,689✔
518
  STREAM_CHECK_RET_GOTO(tDecodeSubmitReq(&decoder, &submit, NULL));
8,681!
519

520
  int32_t numOfBlocks = taosArrayGetSize(submit.aSubmitTbData);
8,682✔
521
  while (++nextBlk < numOfBlocks) {
17,362✔
522
    stDebug("stream reader next data block %d/%d", nextBlk, numOfBlocks);
8,668✔
523
    SSubmitTbData* pSubmitTbData = taosArrayGet(submit.aSubmitTbData, nextBlk);
8,668✔
524
    STREAM_CHECK_NULL_GOTO(pSubmitTbData, terrno);
8,668!
525
    if (pSubmitTbData->uid != uid) {
8,668!
526
      stDebug("stream reader skip data block uid:%" PRId64, pSubmitTbData->uid);
×
527
      continue;
×
528
    }
529
    STREAM_CHECK_RET_GOTO(retrieveWalData(pVnode, pSubmitTbData, pBlock, window));
8,668!
530
    printDataBlock(pBlock, __func__, "");
8,688✔
531

532
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRet, pBlock));
8,686!
533
    blockDataCleanup(pBlock);
8,685✔
534
  }
535

536
end:
8,694✔
537
  tDestroySubmitReq(&submit, TSDB_MSG_FLG_DECODE);
8,694✔
538
  walCloseReader(pWalReader);
8,696✔
539
  tDecoderClear(&decoder);
8,702✔
540
  STREAM_PRINT_LOG_END(code, lino);
8,701!
541
  return code;
8,701✔
542
}
543

544
static int32_t processTag(SVnode* pVnode, SExprInfo* pExpr, int32_t numOfExpr, SStorageAPI* api, SSDataBlock* pBlock) {
5,623✔
545
  int32_t     code = 0;
5,623✔
546
  int32_t     lino = 0;
5,623✔
547
  SMetaReader mr = {0};
5,623✔
548

549
  if (numOfExpr == 0) {
5,623!
550
    return TSDB_CODE_SUCCESS;
×
551
  }
552
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
5,623✔
553
  code = api->metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
5,628✔
554
  if (code != TSDB_CODE_SUCCESS) {
5,610!
555
    stError("failed to get table meta, uid:%" PRId64 ", code:%s", pBlock->info.id.uid, tstrerror(code));
×
556
  }
557
  api->metaReaderFn.readerReleaseLock(&mr);
5,610✔
558
  for (int32_t j = 0; j < numOfExpr; ++j) {
25,287✔
559
    const SExprInfo* pExpr1 = &pExpr[j];
19,675✔
560
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
19,675✔
561

562
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
19,675✔
563
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
19,663✔
564
    if (mr.me.name == NULL) {
19,660!
565
      colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
×
566
      continue;
×
567
    }
568
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
19,660✔
569

570
    int32_t functionId = pExpr1->pExpr->_function.functionId;
19,658✔
571

572
    // this is to handle the tbname
573
    if (fmIsScanPseudoColumnFunc(functionId)) {
19,658✔
574
      int32_t fType = pExpr1->pExpr->_function.functionType;
5,620✔
575
      if (fType == FUNCTION_TYPE_TBNAME) {
5,620!
576
        STREAM_CHECK_RET_GOTO(setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name));
5,621!
577
        pColInfoData->info.colId = -1;
5,604✔
578
      }
579
    } else {  // these are tags
580
      STagVal tagVal = {0};
14,054✔
581
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
14,054✔
582
      const char* p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
14,054✔
583

584
      char* data = NULL;
14,061✔
585
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
14,061✔
586
        data = tTagValToData((const STagVal*)p, false);
12,534✔
587
      } else {
588
        data = (char*)p;
1,527✔
589
      }
590

591
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
14,054!
592
      if (isNullVal) {
14,054✔
593
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
1,513✔
594
      } else {
595
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
12,541✔
596
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
12,537!
597
          taosMemoryFree(data);
7,130!
598
        }
599
        STREAM_CHECK_RET_GOTO(code);
12,547!
600
      }
601
    }
602
  }
603

604
end:
5,612✔
605
  api->metaReaderFn.clearReader(&mr);
5,612✔
606

607
  STREAM_PRINT_LOG_END(code, lino);
5,615!
608
  return code;
5,619✔
609
}
610

611
static int32_t processWalVerData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamInfo, int64_t ver, bool isTrigger,
5,410✔
612
                                 int64_t uid, STimeWindow* window, SSDataBlock* pSrcBlock, SSDataBlock** pBlock) {
613
  int32_t      code = 0;
5,410✔
614
  int32_t      lino = 0;
5,410✔
615
  SFilterInfo* pFilterInfo = NULL;
5,410✔
616
  SSDataBlock* pBlock1 = NULL;
5,410✔
617
  SSDataBlock* pBlock2 = NULL;
5,410✔
618

619
  SExprInfo*   pExpr = sStreamInfo->pExprInfo;
5,410✔
620
  int32_t      numOfExpr = sStreamInfo->numOfExpr;
5,410✔
621

622
  STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamInfo->pConditions, &pFilterInfo, 0, NULL));
5,410!
623

624
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock1));
5,405!
625
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamInfo->triggerResBlock, false, &pBlock2));
5,409!
626
  if (!isTrigger) STREAM_CHECK_RET_GOTO(createOneDataBlock(pSrcBlock, false, pBlock));
5,411!
627

628
  pBlock2->info.id.uid = uid;
5,409✔
629
  pBlock1->info.id.uid = uid;
5,409✔
630

631
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
5,409!
632

633
  if (pBlock2->info.rows > 0) {
5,417!
634
    SStorageAPI  api = {0};
5,417✔
635
    initStorageAPI(&api);
5,417✔
636
    STREAM_CHECK_RET_GOTO(processTag(pVnode, pExpr, numOfExpr, &api, pBlock2));
5,408!
637
  }
638
  STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock2, pFilterInfo));
5,395!
639
  if (!isTrigger) {
5,413✔
640
    blockDataTransform(*pBlock, pBlock2);
3,056✔
641
  } else {
642
    *pBlock = pBlock2;
2,357✔
643
    pBlock2 = NULL;  
2,357✔
644
  }
645

646
  printDataBlock(*pBlock, __func__, "processWalVerData2");
5,414✔
647

648
end:
5,400✔
649
  STREAM_PRINT_LOG_END(code, lino);
5,400!
650
  filterFreeInfo(pFilterInfo);
5,400✔
651
  blockDataDestroy(pBlock1);
5,415✔
652
  blockDataDestroy(pBlock2);
5,414✔
653
  return code;
5,414✔
654
}
655

656
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
3,710✔
657
  int32_t code = 0;
3,710✔
658
  int32_t lino = 0;
3,710✔
659
  SMetaReader metaReader = {0};
3,710✔
660
  SStorageAPI api = {0};
3,710✔
661
  initStorageAPI(&api);
3,710✔
662
  *schemas = taosArrayInit(8, sizeof(SSchema));
3,710✔
663
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
3,712!
664
  
665
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
3,712✔
666
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
3,711!
667

668
  SSchemaWrapper* sSchemaWrapper = NULL;
3,710✔
669
  if (metaReader.me.type == TD_CHILD_TABLE) {
3,710✔
670
    int64_t suid = metaReader.me.ctbEntry.suid;
3,593✔
671
    tDecoderClear(&metaReader.coder);
3,593✔
672
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
3,595!
673
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
3,595✔
674
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
117!
675
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
117✔
676
  } else {
677
    qError("invalid table type:%d", metaReader.me.type);
×
678
  }
679

680
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
23,647✔
681
    SSchema* s = sSchemaWrapper->pSchema + j;
19,938✔
682
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
39,874!
683
  }
684

685
end:
3,709✔
686
  api.metaReaderFn.clearReader(&metaReader);
3,709✔
687
  STREAM_PRINT_LOG_END(code, lino);
3,712!
688
  if (code != 0)  taosArrayDestroy(*schemas);
3,712!
689
  return code;
3,712✔
690
}
691

692
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
3,712✔
693
  int32_t code = 0;
3,712✔
694
  int32_t lino = 0;
3,712✔
695
  size_t  schemaLen = taosArrayGetSize(schemas);
3,712✔
696
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
3,709!
697
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
17,069✔
698
    col_id_t* id = taosArrayGet(cols, i);
13,356✔
699
    STREAM_CHECK_NULL_GOTO(id, terrno);
13,355!
700
    for (size_t i = 0; i < schemaLen; i++) {
48,385✔
701
      SSchema* s = taosArrayGet(schemas, i);
48,381✔
702
      STREAM_CHECK_NULL_GOTO(s, terrno);
48,380!
703
      if (*id == s->colId) {
48,384✔
704
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
13,355!
705
        break;
13,355✔
706
      }
707
    }
708
  }
709
  taosArrayPopFrontBatch(schemas, schemaLen);
3,709✔
710

711
end:
3,711✔
712
  return code;
3,711✔
713
}
714

715
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
3,284✔
716
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
717
  int32_t      code = 0;
3,284✔
718
  int32_t      lino = 0;
3,284✔
719
  SArray*      schemas = NULL;
3,284✔
720

721
  SSDataBlock* pBlock1 = NULL;
3,284✔
722
  SSDataBlock* pBlock2 = NULL;
3,284✔
723

724
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
3,284!
725
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
3,285!
726
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock1));
3,284!
727
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
3,281!
728

729
  pBlock2->info.id.uid = uid;
3,284✔
730
  pBlock1->info.id.uid = uid;
3,284✔
731

732
  STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock1, pBlock2, ver, uid, window));
3,284!
733
  printDataBlock(pBlock2, __func__, "");
3,284✔
734

735
  *pBlock = pBlock2;
3,284✔
736
  pBlock2 = NULL;
3,284✔
737

738
end:
3,284✔
739
  STREAM_PRINT_LOG_END(code, lino);
3,284!
740
  blockDataDestroy(pBlock1);
3,284✔
741
  blockDataDestroy(pBlock2);
3,284✔
742
  taosArrayDestroy(schemas);
3,285✔
743
  return code;
3,285✔
744
}
745

746
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
263,149✔
747
                                    STargetNode* pTargetNodeTs) {
748
  int32_t code = 0;
263,149✔
749
  int32_t lino = 0;
263,149✔
750

751
  SColumnNode*         pCol = NULL;
263,149✔
752
  SColumnNode*         pCol1 = NULL;
263,149✔
753
  SValueNode*          pVal = NULL;
263,149✔
754
  SValueNode*          pVal1 = NULL;
263,149✔
755
  SOperatorNode*       op = NULL;
263,149✔
756
  SOperatorNode*       op1 = NULL;
263,149✔
757
  SLogicConditionNode* cond = NULL;
263,149✔
758

759
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
263,149!
760
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
263,149✔
761
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
263,149✔
762
  pCol->node.resType.bytes = LONG_BYTES;
263,149✔
763
  pCol->slotId = pTargetNodeTs->slotId;
263,149✔
764
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
263,149✔
765

766
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
263,149!
767

768
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
263,149!
769
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
263,149✔
770
  pVal->node.resType.bytes = LONG_BYTES;
263,149✔
771
  pVal->datum.i = start;
263,149✔
772
  pVal->typeData = start;
263,149✔
773

774
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
263,149!
775
  pVal1->datum.i = end;
263,149✔
776
  pVal1->typeData = end;
263,149✔
777

778
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
263,149!
779
  op->opType = OP_TYPE_GREATER_EQUAL;
263,149✔
780
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,149✔
781
  op->node.resType.bytes = CHAR_BYTES;
263,149✔
782
  op->pLeft = (SNode*)pCol;
263,149✔
783
  op->pRight = (SNode*)pVal;
263,149✔
784
  pCol = NULL;
263,149✔
785
  pVal = NULL;
263,149✔
786

787
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
263,149!
788
  op1->opType = OP_TYPE_LOWER_EQUAL;
263,149✔
789
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,149✔
790
  op1->node.resType.bytes = CHAR_BYTES;
263,149✔
791
  op1->pLeft = (SNode*)pCol1;
263,149✔
792
  op1->pRight = (SNode*)pVal1;
263,149✔
793
  pCol1 = NULL;
263,149✔
794
  pVal1 = NULL;
263,149✔
795

796
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
263,149!
797
  cond->condType = LOGIC_COND_TYPE_AND;
263,149✔
798
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
263,149✔
799
  cond->node.resType.bytes = CHAR_BYTES;
263,149✔
800
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
263,149!
801
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
263,149!
802
  op = NULL;
263,149✔
803
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
263,149!
804
  op1 = NULL;
263,149✔
805

806
  *pCond = cond;
263,149✔
807

808
end:
263,149✔
809
  if (code != 0) {
263,149!
810
    nodesDestroyNode((SNode*)pCol);
×
811
    nodesDestroyNode((SNode*)pCol1);
×
812
    nodesDestroyNode((SNode*)pVal);
×
813
    nodesDestroyNode((SNode*)pVal1);
×
814
    nodesDestroyNode((SNode*)op);
×
815
    nodesDestroyNode((SNode*)op1);
×
816
    nodesDestroyNode((SNode*)cond);
×
817
  }
818
  STREAM_PRINT_LOG_END(code, lino);
263,149!
819

820
  return code;
263,149✔
821
}
822

823
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
188✔
824
  int32_t              code = 0;
188✔
825
  int32_t              lino = 0;
188✔
826
  SLogicConditionNode* pAndCondition = NULL;
188✔
827
  SLogicConditionNode* cond = NULL;
188✔
828

829
  if (pTargetNodeTs == NULL) {
188!
830
    vError("stream reader %s no ts column", __func__);
×
831
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
×
832
  }
833
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
188!
834
  cond->condType = LOGIC_COND_TYPE_OR;
188✔
835
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
188✔
836
  cond->node.resType.bytes = CHAR_BYTES;
188✔
837
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
188!
838

839
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
263,337✔
840
    data->curIdx = i;
263,149✔
841

842
    SReadHandle handle = {0};
263,149✔
843
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
263,149✔
844
    if (!handle.winRangeValid) {
263,149!
845
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
×
846
              handle.winRange.ekey);
847
      continue;
×
848
    }
849
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
263,149!
850
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
263,149✔
851
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
263,149!
852
    pAndCondition = NULL;
263,149✔
853
  }
854

855
  *pCond = cond;
188✔
856

857
end:
188✔
858
  if (code != 0) {
188!
859
    nodesDestroyNode((SNode*)pAndCondition);
×
860
    nodesDestroyNode((SNode*)cond);
×
861
  }
862
  STREAM_PRINT_LOG_END(code, lino);
188!
863

864
  return code;
188✔
865
}
866

867
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
14,356✔
868
                                    STimeRangeNode* node, SReadHandle* handle) {
869
  int32_t code = 0;
14,356✔
870
  int32_t lino = 0;
14,356✔
871
  SArray* funcVals = NULL;
14,356✔
872
  if (req->pStRtFuncInfo->withExternalWindow) {
14,356✔
873
    nodesDestroyNode(sStreamReaderCalcInfo->tsConditions);
188✔
874
    filterFreeInfo(sStreamReaderCalcInfo->pFilterInfo);
188✔
875
    sStreamReaderCalcInfo->pFilterInfo = NULL;
188✔
876

877
    STREAM_CHECK_RET_GOTO(createExternalConditions(req->pStRtFuncInfo,
188!
878
                                                   (SLogicConditionNode**)&sStreamReaderCalcInfo->tsConditions,
879
                                                   sStreamReaderCalcInfo->pTargetNodeTs, node));
880

881
    STREAM_CHECK_RET_GOTO(filterInitFromNode((SNode*)sStreamReaderCalcInfo->tsConditions,
188!
882
                                             (SFilterInfo**)&sStreamReaderCalcInfo->pFilterInfo,
883
                                             FLT_OPTION_NO_REWRITE | FLT_OPTION_SCALAR_MODE, NULL));
884
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
188✔
885
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
188✔
886
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
188!
887
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
188!
888

889
    handle->winRange.skey = pFirst->wstart;
188✔
890
    handle->winRange.ekey = pLast->wend;
188✔
891
    handle->winRangeValid = true;
188✔
892
    stDebug("%s withExternalWindow is true, skey:%" PRId64 ", ekey:%" PRId64, __func__, pFirst->wstart, pLast->wend);
188✔
893
  } else {
894
    calcTimeRange(node, req->pStRtFuncInfo, &handle->winRange, &handle->winRangeValid);
14,168✔
895
  }
896

897
end:
14,356✔
898
  taosArrayDestroy(funcVals);
14,356✔
899
  return code;
14,356✔
900
}
901

902
static int32_t createBlockForWalMeta(SSDataBlock** pBlock, bool isVTable) {
7,863✔
903
  int32_t code = 0;
7,863✔
904
  int32_t lino = 0;
7,863✔
905
  SArray* schemas = NULL;
7,863✔
906

907
  schemas = taosArrayInit(8, sizeof(SSchema));
7,863✔
908
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
7,873!
909

910
  int32_t index = 0;
7,873✔
911
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
7,873!
912
  if (!isVTable) {
7,883✔
913
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid
6,683!
914
  }
915
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
7,881!
916
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
7,866!
917
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
7,876!
918
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
7,878!
919
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // nrows
7,868!
920

921
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
7,865!
922

923
end:
7,889✔
924
  taosArrayDestroy(schemas);
7,889✔
925
  return code;
7,893✔
926
}
927

928
static int32_t createOptionsForLastTs(SStreamTriggerReaderTaskInnerOptions* options,
351✔
929
                                      SStreamTriggerReaderInfo*             sStreamReaderInfo) {
930
  int32_t code = 0;
351✔
931
  int32_t lino = 0;
351✔
932
  SArray* schemas = NULL;
351✔
933

934
  schemas = taosArrayInit(4, sizeof(SSchema));
351✔
935
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
355!
936
  STREAM_CHECK_RET_GOTO(
355!
937
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // last ts
938

939
  BUILD_OPTION(op, sStreamReaderInfo, -1, true, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, schemas, true,
354✔
940
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
941
  schemas = NULL;
354✔
942
  *options = op;
354✔
943

944
end:
354✔
945
  taosArrayDestroy(schemas);
354✔
946
  return code;
355✔
947
}
948

949
static int32_t createOptionsForFirstTs(SStreamTriggerReaderTaskInnerOptions* options,
337✔
950
                                       SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t ver) {
951
  int32_t code = 0;
337✔
952
  int32_t lino = 0;
337✔
953
  SArray* schemas = NULL;
337✔
954

955
  schemas = taosArrayInit(4, sizeof(SSchema));
337✔
956
  STREAM_CHECK_NULL_GOTO(schemas, terrno)
337!
957
  STREAM_CHECK_RET_GOTO(
337!
958
      qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID))  // first ts
959

960
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, TSDB_ORDER_ASC, start, INT64_MAX, schemas, true,
337✔
961
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, sStreamReaderInfo->uidList == NULL, NULL);
962
  schemas = NULL;
337✔
963

964
  *options = op;
337✔
965
end:
337✔
966
  taosArrayDestroy(schemas);
337✔
967
  return code;
337✔
968
}
969

970
static int32_t createOptionsForTsdbMeta(SStreamTriggerReaderTaskInnerOptions* options,
1,204✔
971
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t start, int64_t end,
972
                                        int64_t gid, int8_t order, int64_t ver, bool onlyTs) {
973
  int32_t code = 0;
1,204✔
974
  int32_t lino = 0;
1,204✔
975
  SArray* schemas = NULL;
1,204✔
976

977
  int32_t index = 1;
1,204✔
978
  schemas = taosArrayInit(8, sizeof(SSchema));
1,204✔
979
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,204!
980
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
1,204!
981
  if (!onlyTs){
1,204✔
982
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
602!
983
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
602!
984
    if (sStreamReaderInfo->uidList == NULL) {
602✔
985
      STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
127!
986
    }
987
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
602!
988
  }
989
  
990
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, start, end, schemas, true, (gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), gid,
1,204✔
991
               true, sStreamReaderInfo->uidList);
992
  schemas = NULL;
1,204✔
993
  *options = op;
1,204✔
994

995
end:
1,204✔
996
  taosArrayDestroy(schemas);
1,204✔
997
  return code;
1,204✔
998
}
999

1000
static int taosCompareInt64Asc(const void* elem1, const void* elem2) {
93✔
1001
  int64_t* node1 = (int64_t*)elem1;
93✔
1002
  int64_t* node2 = (int64_t*)elem2;
93✔
1003

1004
  if (*node1 < *node2) {
93!
1005
    return -1;
×
1006
  }
1007

1008
  return *node1 > *node2;
93✔
1009
}
1010

1011
static int32_t getTableList(SArray** pList, int32_t* pNum, int64_t* suid, int32_t index,
104✔
1012
                            SStreamTriggerReaderInfo* sStreamReaderInfo) {
1013
  int32_t code = 0;
104✔
1014
  int32_t lino = 0;
104✔
1015

1016
  int32_t* start = taosArrayGet(sStreamReaderInfo->uidListIndex, index);
104✔
1017
  STREAM_CHECK_NULL_GOTO(start, terrno);
104!
1018
  int32_t  iStart = *start;
104✔
1019
  int32_t* end = taosArrayGet(sStreamReaderInfo->uidListIndex, index + 1);
104✔
1020
  STREAM_CHECK_NULL_GOTO(end, terrno);
104!
1021
  int32_t iEnd = *end;
104✔
1022
  *pList = taosArrayInit(iEnd - iStart, sizeof(STableKeyInfo));
104✔
1023
  STREAM_CHECK_NULL_GOTO(*pList, terrno);
104!
1024
  for (int32_t i = iStart; i < iEnd; ++i) {
363✔
1025
    int64_t*       uid = taosArrayGet(sStreamReaderInfo->uidList, i);
259✔
1026
    STableKeyInfo* info = taosArrayReserve(*pList, 1);
259✔
1027
    STREAM_CHECK_NULL_GOTO(info, terrno);
259!
1028
    *suid = uid[0];
259✔
1029
    info->uid = uid[1];
259✔
1030
  }
1031
  *pNum = iEnd - iStart;
104✔
1032
end:
104✔
1033
  STREAM_PRINT_LOG_END(code, lino);
104!
1034
  return code;
104✔
1035
}
1036

1037
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
586✔
1038
                                  SStreamReaderTaskInner* pTask) {
1039
  int32_t code = 0;
586✔
1040
  int32_t lino = 0;
586✔
1041

1042
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTask->pTableList), sizeof(STsInfo));
586✔
1043
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
588!
1044
  while (true) {
587✔
1045
    bool hasNext = false;
1,175✔
1046
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
1,175!
1047
    if (hasNext) {
1,174✔
1048
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
672✔
1049
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
672✔
1050
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
671!
1051
      if (pTask->options.order == TSDB_ORDER_ASC) {
671✔
1052
        tsInfo->ts = pTask->pResBlock->info.window.skey;
549✔
1053
      } else {
1054
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
122✔
1055
      }
1056
      tsInfo->gId = qStreamGetGroupId(pTask->pTableList, pTask->pResBlock->info.id.uid);
671✔
1057
      stDebug("vgId:%d %s get last ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
672✔
1058
              tsInfo->gId, tsRsp->ver);
1059
    }
1060

1061
    pTask->currentGroupIndex++;
1,174✔
1062
    if (pTask->currentGroupIndex >= qStreamGetTableListGroupNum(pTask->pTableList)) {
1,174✔
1063
      break;
588✔
1064
    }
1065
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTask));
586!
1066
  }
1067

1068
end:
588✔
1069
  return code;
588✔
1070
}
1071

1072
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
104✔
1073
                               SStreamReaderTaskInner* pTask, int64_t ver) {
1074
  int32_t code = 0;
104✔
1075
  int32_t lino = 0;
104✔
1076
  int64_t suid = 0;
104✔
1077
  SArray* pList = NULL;
104✔
1078
  int32_t pNum = 0;
104✔
1079

1080
  tsRsp->tsInfo = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(STsInfo));
104✔
1081
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
104!
1082
  for (int32_t i = 0; i < taosArrayGetSize(sStreamReaderInfo->uidListIndex) - 1; ++i) {
208✔
1083
    STREAM_CHECK_RET_GOTO(getTableList(&pList, &pNum, &suid, i, sStreamReaderInfo));
104!
1084

1085
    cleanupQueryTableDataCond(&pTask->cond);
104✔
1086
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas,
104!
1087
                                                        pTask->options.isSchema, pTask->options.twindows, suid, ver));
1088
    STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderOpen(
104!
1089
        pVnode, &pTask->cond, taosArrayGet(pList, 0), pNum, pTask->pResBlock, (void**)&pTask->pReader, pTask->idStr, NULL));
1090
    taosArrayDestroy(pList);
104✔
1091
    pList = NULL;
104✔
1092
    while (true) {
184✔
1093
      bool hasNext = false;
288✔
1094
      STREAM_CHECK_RET_GOTO(getTableDataInfo(pTask, &hasNext));
288!
1095
      if (!hasNext) {
288✔
1096
        break;
104✔
1097
      }
1098
      pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
184✔
1099
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
184✔
1100
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
184!
1101
      if (pTask->options.order == TSDB_ORDER_ASC) {
184✔
1102
        tsInfo->ts = pTask->pResBlock->info.window.skey;
118✔
1103
      } else {
1104
        tsInfo->ts = pTask->pResBlock->info.window.ekey;
66✔
1105
      }
1106
      tsInfo->gId = pTask->pResBlock->info.id.uid;
184✔
1107
      stDebug("vgId:%d %s get vtable last ts:%" PRId64 ", uid:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__,
184✔
1108
              tsInfo->ts, tsInfo->gId, tsRsp->ver);
1109
    }
1110
    pTask->api.tsdReader.tsdReaderClose(pTask->pReader);
104✔
1111
    pTask->pReader = NULL;
104✔
1112
  }
1113

1114
end:
104✔
1115
  taosArrayDestroy(pList);
104✔
1116
  return code;
104✔
1117
}
1118

1119
static void reSetUid(SStreamTriggerReaderTaskInnerOptions* options, int64_t suid, int64_t uid) {
464✔
1120
  if (suid != 0) options->suid = suid;
464✔
1121
  options->uid = uid;
464✔
1122
  if (options->suid != 0) {
464✔
1123
    options->tableType = TD_CHILD_TABLE;
460✔
1124
  } else {
1125
    options->tableType = TD_NORMAL_TABLE;
4✔
1126
  }
1127
}
464✔
1128

1129
static int32_t createOptionsForTsdbData(SVnode* pVnode, SStreamTriggerReaderTaskInnerOptions* options,
427✔
1130
                                        SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t uid, SArray* cols,
1131
                                        int8_t order,int64_t skey, int64_t ekey, int64_t ver) {
1132
  int32_t code = 0;
427✔
1133
  int32_t lino = 0;
427✔
1134
  SArray* schemas = NULL;
427✔
1135

1136
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
427!
1137
  STREAM_CHECK_RET_GOTO(shrinkScheams(cols, schemas));
427!
1138
  BUILD_OPTION(op, sStreamReaderInfo, ver, true, order, skey, ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
427✔
1139
  *options = op;
427✔
1140

1141
end:
427✔
1142
  return code;
427✔
1143
}
1144

1145
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
69✔
1146
  int32_t code = 0;
69✔
1147
  int32_t lino = 0;
69✔
1148
  void*   buf = NULL;
69✔
1149
  size_t  size = 0;
69✔
1150
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
69!
1151
  void* pTask = sStreamReaderInfo->pTask;
69✔
1152

1153
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
69✔
1154

1155
  TSWAP(sStreamReaderInfo->uidList, req->setTableReq.uids);
69✔
1156
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidList, TSDB_CODE_INVALID_PARA);
69!
1157

1158
  taosArraySort(sStreamReaderInfo->uidList, taosCompareInt64Asc);
69✔
1159
  if (sStreamReaderInfo->uidListIndex != NULL) {
69!
1160
    taosArrayClear(sStreamReaderInfo->uidListIndex);
×
1161
  } else {
1162
    sStreamReaderInfo->uidListIndex = taosArrayInit(taosArrayGetSize(sStreamReaderInfo->uidList), sizeof(int32_t));
69✔
1163
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidListIndex, TSDB_CODE_INVALID_PARA);
69!
1164
  }
1165

1166
  if (sStreamReaderInfo->uidHash != NULL) {
69!
1167
    taosHashClear(sStreamReaderInfo->uidHash);
×
1168
  } else {
1169
    sStreamReaderInfo->uidHash = taosHashInit(taosArrayGetSize(sStreamReaderInfo->uidList),
69✔
1170
                                              taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1171
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHash, TSDB_CODE_INVALID_PARA);
69!
1172
  }
1173

1174
  int64_t suid = 0;
69✔
1175
  int32_t cnt = taosArrayGetSize(sStreamReaderInfo->uidList);
69✔
1176
  for (int32_t i = 0; i < cnt; ++i) {
209✔
1177
    int64_t* data = taosArrayGet(sStreamReaderInfo->uidList, i);
140✔
1178
    STREAM_CHECK_NULL_GOTO(data, terrno);
140!
1179
    if (*data == 0 || *data != suid) {
140✔
1180
      STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &i), terrno);
138!
1181
    }
1182
    suid = *data;
140✔
1183
    stDebug("vgId:%d %s suid:%" PRId64 ",uid:%" PRId64 ", index:%d", TD_VID(pVnode), __func__, suid, data[1], i);
140✔
1184
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->uidHash, data + 1, LONG_BYTES, &i, sizeof(int32_t)));
140!
1185
  }
1186
  STREAM_CHECK_NULL_GOTO(taosArrayPush(sStreamReaderInfo->uidListIndex, &cnt), terrno);
138!
1187

1188
end:
69✔
1189
  STREAM_PRINT_LOG_END_WITHID(code, lino);
69!
1190
  SRpcMsg rsp = {
69✔
1191
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1192
  tmsgSendRsp(&rsp);
69✔
1193
  return code;
69✔
1194
}
1195

1196
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
353✔
1197
  int32_t                 code = 0;
353✔
1198
  int32_t                 lino = 0;
353✔
1199
  SArray*                 schemas = NULL;
353✔
1200
  SStreamReaderTaskInner* pTaskInner = NULL;
353✔
1201
  SStreamTsResponse       lastTsRsp = {0};
353✔
1202
  void*                   buf = NULL;
353✔
1203
  size_t                  size = 0;
353✔
1204

1205
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
353!
1206
  void* pTask = sStreamReaderInfo->pTask;
353✔
1207

1208
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
353✔
1209

1210
  SStreamTriggerReaderTaskInnerOptions options = {0};
353✔
1211
  STREAM_CHECK_RET_GOTO(createOptionsForLastTs(&options, sStreamReaderInfo));
353!
1212
  SStorageAPI api = {0};
355✔
1213
  initStorageAPI(&api);
355✔
1214
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
355!
1215

1216
  lastTsRsp.ver = pVnode->state.applied;
355✔
1217
  if (sStreamReaderInfo->uidList != NULL) {
355✔
1218
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner, -1));
67!
1219
  } else {
1220
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
288!
1221
  }
1222
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
355✔
1223
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
355!
1224

1225
end:
355✔
1226
  STREAM_PRINT_LOG_END_WITHID(code, lino);
355!
1227
  SRpcMsg rsp = {
355✔
1228
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1229
  tmsgSendRsp(&rsp);
355✔
1230
  taosArrayDestroy(lastTsRsp.tsInfo);
355✔
1231
  releaseStreamTask(&pTaskInner);
355✔
1232
  return code;
355✔
1233
}
1234

1235
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
337✔
1236
  int32_t                 code = 0;
337✔
1237
  int32_t                 lino = 0;
337✔
1238
  SStreamReaderTaskInner* pTaskInner = NULL;
337✔
1239
  SStreamTsResponse       firstTsRsp = {0};
337✔
1240
  void*                   buf = NULL;
337✔
1241
  size_t                  size = 0;
337✔
1242

1243
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
337!
1244
  void* pTask = sStreamReaderInfo->pTask;
337✔
1245
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
337✔
1246
  SStreamTriggerReaderTaskInnerOptions options = {0};
337✔
1247
  STREAM_CHECK_RET_GOTO(createOptionsForFirstTs(&options, sStreamReaderInfo, req->firstTsReq.startTime, req->firstTsReq.ver));
337!
1248
  SStorageAPI api = {0};
337✔
1249
  initStorageAPI(&api);
337✔
1250
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
337!
1251
  
1252
  firstTsRsp.ver = pVnode->state.applied;
337✔
1253
  if (sStreamReaderInfo->uidList != NULL) {
337✔
1254
    STREAM_CHECK_RET_GOTO(processTsVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.ver));
37!
1255
  } else {
1256
    STREAM_CHECK_RET_GOTO(processTsNonVTable(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
300!
1257
  }
1258

1259
  ST_TASK_DLOG("vgId:%d %s get result", TD_VID(pVnode), __func__);
337✔
1260
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
337!
1261

1262
end:
337✔
1263
  STREAM_PRINT_LOG_END_WITHID(code, lino);
337!
1264
  SRpcMsg rsp = {
337✔
1265
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1266
  tmsgSendRsp(&rsp);
337✔
1267
  taosArrayDestroy(firstTsRsp.tsInfo);
337✔
1268
  releaseStreamTask(&pTaskInner);
337✔
1269
  return code;
337✔
1270
}
1271

1272
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
602✔
1273
  int32_t code = 0;
602✔
1274
  int32_t lino = 0;
602✔
1275
  void*   buf = NULL;
602✔
1276
  size_t  size = 0;
602✔
1277
  SStreamTriggerReaderTaskInnerOptions options = {0};
602✔
1278

1279
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
602!
1280
  void* pTask = sStreamReaderInfo->pTask;
602✔
1281
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
602✔
1282

1283
  SStreamReaderTaskInner* pTaskInner = NULL;
602✔
1284
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
602✔
1285

1286
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
602!
1287
    SStreamTriggerReaderTaskInnerOptions optionsTs = {0};
602✔
1288

1289
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&optionsTs, sStreamReaderInfo, req->tsdbMetaReq.startTime,
602!
1290
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, true));
1291
    SStorageAPI api = {0};
602✔
1292
    initStorageAPI(&api);
602✔
1293
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &optionsTs, &pTaskInner, NULL, sStreamReaderInfo->groupIdMap, &api));
602!
1294
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
602!
1295
    
1296
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbMeta(&options, sStreamReaderInfo, req->tsdbMetaReq.startTime,
602!
1297
      req->tsdbMetaReq.endTime, req->tsdbMetaReq.gid, req->tsdbMetaReq.order, req->tsdbMetaReq.ver, false));
1298
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(options.schemas, &pTaskInner->pResBlockDst));
602!
1299
  } else {
1300
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1301
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1302
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1303
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1304
  }
1305

1306
  blockDataCleanup(pTaskInner->pResBlockDst);
602✔
1307
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
602!
1308
  bool hasNext = true;
602✔
1309
  while (true) {
202✔
1310
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
804!
1311
    if (!hasNext) {
804✔
1312
      break;
602✔
1313
    }
1314
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
202✔
1315
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
202✔
1316

1317
    int32_t index = 0;
202✔
1318
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
202!
1319
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
202!
1320
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
202!
1321
    if (sStreamReaderInfo->uidList == NULL) {
202✔
1322
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
36!
1323
    }
1324
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
202!
1325

1326
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
202✔
1327
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1328
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1329
            pTaskInner->pResBlockDst->info.rows++;
202✔
1330
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
202!
1331
      break;
×
1332
    }
1333
  }
1334

1335
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
602✔
1336
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
602!
1337
  if (!hasNext) {
602!
1338
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
602!
1339
  }
1340

1341
end:
602✔
1342
  STREAM_PRINT_LOG_END_WITHID(code, lino);
602!
1343
  SRpcMsg rsp = {
602✔
1344
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1345
  tmsgSendRsp(&rsp);
602✔
1346
  taosArrayDestroy(options.schemas);
602✔
1347
  return code;
602✔
1348
}
1349

1350
static int32_t vnodeProcessStreamTsdbTsDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
37✔
1351
  int32_t                 code = 0;
37✔
1352
  int32_t                 lino = 0;
37✔
1353
  SStreamReaderTaskInner* pTaskInner = NULL;
37✔
1354
  void*                   buf = NULL;
37✔
1355
  size_t                  size = 0;
37✔
1356
  SSDataBlock*            pBlockRes = NULL;
37✔
1357

1358
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
37!
1359
  void* pTask = sStreamReaderInfo->pTask;
37✔
1360
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
37!
1361

1362
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
37✔
1363
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
1364
  reSetUid(&options, req->tsdbTsDataReq.suid, req->tsdbTsDataReq.uid);
37✔
1365
  SStorageAPI api = {0};
37✔
1366
  initStorageAPI(&api);
37✔
1367
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
37!
1368
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
37!
1369
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
37!
1370

1371
  while (1) {
37✔
1372
    bool hasNext = false;
74✔
1373
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
74!
1374
    if (!hasNext) {
74✔
1375
      break;
37✔
1376
    }
1377
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
37✔
1378

1379
    SSDataBlock* pBlock = NULL;
37✔
1380
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
37!
1381
    if (pBlock != NULL && pBlock->info.rows > 0) {
37!
1382
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &api, pBlock));
37!
1383
    }
1384
    
1385
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
37!
1386
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
37!
1387
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
37!
1388
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1389
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1390
  }
1391

1392
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
37✔
1393

1394
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
37!
1395
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
37!
1396

1397
end:
37✔
1398
  STREAM_PRINT_LOG_END_WITHID(code, lino);
37!
1399
  SRpcMsg rsp = {
37✔
1400
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1401
  tmsgSendRsp(&rsp);
37✔
1402
  blockDataDestroy(pBlockRes);
37✔
1403

1404
  releaseStreamTask(&pTaskInner);
37✔
1405
  return code;
37✔
1406
}
1407

1408
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
345✔
1409
  int32_t code = 0;
345✔
1410
  int32_t lino = 0;
345✔
1411
  void*   buf = NULL;
345✔
1412
  size_t  size = 0;
345✔
1413

1414
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
345!
1415
  SStreamReaderTaskInner* pTaskInner = NULL;
345✔
1416
  void* pTask = sStreamReaderInfo->pTask;
345✔
1417
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
345✔
1418
  
1419
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
345✔
1420

1421
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
345✔
1422
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, true, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
167✔
1423
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
1424
                 req->tsdbTriggerDataReq.gid, true, NULL);
1425
    SStorageAPI api = {0};
167✔
1426
    initStorageAPI(&api);
167✔
1427
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock,
167!
1428
                                           sStreamReaderInfo->groupIdMap, &api));
1429
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
167!
1430
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
167!
1431
  } else {
1432
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
178✔
1433
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
178!
1434
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
178✔
1435
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
178!
1436
  }
1437

1438
  blockDataCleanup(pTaskInner->pResBlockDst);
345✔
1439
  bool hasNext = true;
345✔
1440
  while (1) {
×
1441
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
345!
1442
    if (!hasNext) {
345✔
1443
      break;
167✔
1444
    }
1445
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
178✔
1446
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
178✔
1447

1448
    SSDataBlock* pBlock = NULL;
178✔
1449
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
178!
1450
    if (pBlock != NULL && pBlock->info.rows > 0) {
178!
1451
      STREAM_CHECK_RET_GOTO(
178!
1452
        processTag(pVnode, sStreamReaderInfo->pExprInfo, sStreamReaderInfo->numOfExpr, &pTaskInner->api, pBlock));
1453
    }
1454
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
178!
1455
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
178!
1456
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
178✔
1457
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
1458
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
1459
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
178!
1460
      break;
178✔
1461
    }
1462
  }
1463

1464
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
345!
1465
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
345✔
1466
  if (!hasNext) {
345✔
1467
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
167!
1468
  }
1469

1470
end:
345✔
1471
  STREAM_PRINT_LOG_END_WITHID(code, lino);
345!
1472
  SRpcMsg rsp = {
345✔
1473
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1474
  tmsgSendRsp(&rsp);
345✔
1475

1476
  return code;
345✔
1477
}
1478

1479
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
28,346✔
1480
  int32_t code = 0;
28,346✔
1481
  int32_t lino = 0;
28,346✔
1482
  void*   buf = NULL;
28,346✔
1483
  size_t  size = 0;
28,346✔
1484
  SSDataBlock*            pBlockRes = NULL;
28,346✔
1485

1486
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
28,346!
1487
  void* pTask = sStreamReaderInfo->pTask;
28,346✔
1488
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, 
28,346✔
1489
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid);
1490

1491
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
28,346!
1492

1493
  SStreamReaderTaskInner* pTaskInner = NULL;
28,346✔
1494
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
28,346✔
1495

1496
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
28,346!
1497
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, true, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
28,346✔
1498
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
1499
    SStorageAPI api = {0};
28,346✔
1500
    initStorageAPI(&api);
28,346✔
1501
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, NULL, &api));
28,345✔
1502

1503
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
28,271!
1504
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
28,271!
1505
  } else {
1506
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1507
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1508
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1509
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1510
  }
1511

1512
  blockDataCleanup(pTaskInner->pResBlockDst);
28,270✔
1513
  bool hasNext = true;
28,270✔
1514
  while (1) {
2,566✔
1515
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
30,836!
1516
    if (!hasNext) {
30,831✔
1517
      break;
28,268✔
1518
    }
1519
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
2,563✔
1520

1521
    SSDataBlock* pBlock = NULL;
2,563✔
1522
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
2,563!
1523
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo));
2,562!
1524
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
2,562!
1525
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
2,566!
1526
      break;
×
1527
    }
1528
  }
1529

1530
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
28,268!
1531
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
28,265!
1532
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
28,266✔
1533
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
28,270!
1534
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
28,270✔
1535
  if (!hasNext) {
28,271!
1536
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
28,271!
1537
  }
1538

1539
end:
28,270✔
1540
  STREAM_PRINT_LOG_END_WITHID(code, lino);
28,345!
1541
  SRpcMsg rsp = {
28,350✔
1542
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1543
  tmsgSendRsp(&rsp);
28,350✔
1544
  blockDataDestroy(pBlockRes);
28,344✔
1545
  return code;
28,346✔
1546
}
1547

1548
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
427✔
1549
  int32_t code = 0;
427✔
1550
  int32_t lino = 0;
427✔
1551
  void*   buf = NULL;
427✔
1552
  size_t  size = 0;
427✔
1553

1554
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
427!
1555
  void* pTask = sStreamReaderInfo->pTask;
427✔
1556
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
427✔
1557

1558
  SStreamReaderTaskInner* pTaskInner = NULL;
427✔
1559
  int64_t key = req->tsdbDataReq.uid;
427✔
1560

1561
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
427!
1562
    SStreamTriggerReaderTaskInnerOptions options = {0};
427✔
1563

1564
    STREAM_CHECK_RET_GOTO(createOptionsForTsdbData(pVnode, &options, sStreamReaderInfo, req->tsdbDataReq.uid,
427!
1565
                                                   req->tsdbDataReq.cids, req->tsdbDataReq.order, req->tsdbDataReq.skey,
1566
                                                   req->tsdbDataReq.ekey, req->tsdbDataReq.ver));
1567
    reSetUid(&options, req->tsdbDataReq.suid, req->tsdbDataReq.uid);
427✔
1568

1569
    SStorageAPI api = {0};
427✔
1570
    initStorageAPI(&api);
427✔
1571
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, NULL, &api));
427!
1572

1573
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
427✔
1574
    cleanupQueryTableDataCond(&pTaskInner->cond);
427✔
1575
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
427!
1576
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
1577
                                                        pTaskInner->options.suid, pTaskInner->options.ver));
1578
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
427!
1579
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
1580

1581
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
427!
1582
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
427!
1583
  } else {
1584
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
1585
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
1586
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
1587
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
1588
  }
1589

1590
  blockDataCleanup(pTaskInner->pResBlockDst);
427✔
1591
  bool hasNext = true;
427✔
1592
  while (1) {
427✔
1593
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
854!
1594
    if (!hasNext) {
854✔
1595
      break;
427✔
1596
    }
1597

1598
    SSDataBlock* pBlock = NULL;
427✔
1599
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
427!
1600
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
427!
1601
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
427!
1602
      break;
×
1603
    }
1604
  }
1605
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
427!
1606
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
427✔
1607
  if (!hasNext) {
427!
1608
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
427!
1609
  }
1610

1611
end:
427✔
1612
  STREAM_PRINT_LOG_END_WITHID(code, lino);
427!
1613
  SRpcMsg rsp = {
427✔
1614
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1615
  tmsgSendRsp(&rsp);
427✔
1616

1617
  return code;
427✔
1618
}
1619

1620
static int32_t vnodeProcessStreamWalMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
7,882✔
1621
  int32_t      code = 0;
7,882✔
1622
  int32_t      lino = 0;
7,882✔
1623
  void*        buf = NULL;
7,882✔
1624
  size_t       size = 0;
7,882✔
1625
  SSDataBlock* pBlock = NULL;
7,882✔
1626
  void*        pTableList = NULL;
7,882✔
1627
  SNodeList*   groupNew = NULL;
7,882✔
1628
  int64_t      lastVer = 0;
7,882✔
1629

1630
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
7,882✔
1631
  void* pTask = sStreamReaderInfo->pTask;
7,865✔
1632
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64 ",ctime:%" PRId64, TD_VID(pVnode), __func__,
7,865✔
1633
  req->walMetaReq.lastVer, req->walMetaReq.ctime);
1634

1635
  bool isVTable = sStreamReaderInfo->uidList != NULL;
7,865✔
1636
  if (sStreamReaderInfo->uidList == NULL) {
7,865✔
1637
    SStorageAPI api = {0};
6,681✔
1638
    initStorageAPI(&api);
6,681✔
1639
    STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
6,691!
1640
    STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
6,684!
1641
        pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1642
        groupNew, false, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1643
        &pTableList, sStreamReaderInfo->groupIdMap));
1644
  }
1645

1646
  STREAM_CHECK_RET_GOTO(createBlockForWalMeta(&pBlock, isVTable));
7,852!
1647
  STREAM_CHECK_RET_GOTO(scanWal(pVnode, isVTable ? sStreamReaderInfo->uidHash : pTableList, isVTable, pBlock,
7,891!
1648
                                req->walMetaReq.lastVer, sStreamReaderInfo->deleteReCalc,
1649
                                sStreamReaderInfo->deleteOutTbl, req->walMetaReq.ctime, &lastVer));
1650

1651
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
7,897✔
1652
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
7,897!
1653
  printDataBlock(pBlock, __func__, "");
7,884✔
1654

1655
end:
7,896✔
1656
  if (pBlock != NULL && pBlock->info.rows == 0) {
7,896✔
1657
    code = TSDB_CODE_STREAM_NO_DATA;
7,454✔
1658
    buf = rpcMallocCont(sizeof(int64_t));
7,454✔
1659
    *(int64_t *)buf = lastVer;
7,459✔
1660
    size = sizeof(int64_t);
7,459✔
1661
  }
1662
  SRpcMsg rsp = {
7,901✔
1663
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1664
  tmsgSendRsp(&rsp);
7,901✔
1665
  if (code == TSDB_CODE_STREAM_NO_DATA){
7,905✔
1666
    code = 0;
7,460✔
1667
  }
1668
  STREAM_PRINT_LOG_END_WITHID(code, lino);
7,905!
1669
  nodesDestroyList(groupNew);
7,905✔
1670
  blockDataDestroy(pBlock);
7,909✔
1671
  qStreamDestroyTableList(pTableList);
7,909✔
1672

1673
  return code;
7,914✔
1674
}
1675

1676
static int32_t vnodeProcessStreamWalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
8,693✔
1677
  int32_t      code = 0;
8,693✔
1678
  int32_t      lino = 0;
8,693✔
1679
  void*        buf = NULL;
8,693✔
1680
  size_t       size = 0;
8,693✔
1681
  SSDataBlock* pBlock = NULL;
8,693✔
1682

1683
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
8,693!
1684
  void* pTask = sStreamReaderInfo->pTask;
8,693✔
1685
  ST_TASK_DLOG("vgId:%d %s start, request type:%d skey:%" PRId64 ",ekey:%" PRId64 ",uid:%" PRId64 ",ver:%" PRId64, TD_VID(pVnode),
8,693✔
1686
  __func__, req->walDataReq.base.type, req->walDataReq.skey, req->walDataReq.ekey, req->walDataReq.uid, req->walDataReq.ver);
1687

1688
  STimeWindow window = {.skey = req->walDataReq.skey, .ekey = req->walDataReq.ekey};
8,695✔
1689
  if (req->walDataReq.base.type == STRIGGER_PULL_WAL_DATA){
8,695✔
1690
    STREAM_CHECK_RET_GOTO(processWalVerDataVTable(pVnode, req->walDataReq.cids, req->walDataReq.ver,
3,284!
1691
      req->walDataReq.uid, &window, &pBlock));
1692
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_CALC_DATA){
5,411✔
1693
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
2,796!
1694
      req->walDataReq.uid, &window, sStreamReaderInfo->calcResBlock, &pBlock));
1695
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TRIGGER_DATA) {
2,615✔
1696
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, true,
2,356!
1697
      req->walDataReq.uid, &window, sStreamReaderInfo->triggerResBlock, &pBlock));
1698
  } else if (req->walDataReq.base.type == STRIGGER_PULL_WAL_TS_DATA){
259!
1699
    STREAM_CHECK_RET_GOTO(processWalVerData(pVnode, sStreamReaderInfo, req->walDataReq.ver, false,
261!
1700
      req->walDataReq.uid, &window, sStreamReaderInfo->tsBlock, &pBlock));
1701

1702
  }
1703
  
1704
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
8,689✔
1705
  printDataBlock(pBlock, __func__, "");
8,689✔
1706
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
8,688!
1707
end:
8,683✔
1708
  STREAM_PRINT_LOG_END_WITHID(code, lino);
8,683!
1709
  SRpcMsg rsp = {
8,683✔
1710
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1711
  tmsgSendRsp(&rsp);
8,683✔
1712

1713
  blockDataDestroy(pBlock);
8,693✔
1714
  return code;
8,701✔
1715
}
1716

1717
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
697✔
1718
  int32_t code = 0;
697✔
1719
  int32_t lino = 0;
697✔
1720
  void*   buf = NULL;
697✔
1721
  size_t  size = 0;
697✔
1722

1723
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
697!
1724
  void* pTask = sStreamReaderInfo->pTask;
697✔
1725
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
697✔
1726

1727
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
697✔
1728
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
697!
1729
  SStreamGroupInfo pGroupInfo = {0};
697✔
1730
  pGroupInfo.gInfo = *gInfo;
697✔
1731

1732
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
697✔
1733
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1734
  buf = rpcMallocCont(size);
697✔
1735
  STREAM_CHECK_NULL_GOTO(buf, terrno);
697!
1736
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
697✔
1737
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
1738
end:
697✔
1739
  if (code != 0) {
697!
1740
    rpcFreeCont(buf);
×
1741
    buf = NULL;
×
1742
    size = 0;
×
1743
  }
1744
  STREAM_PRINT_LOG_END_WITHID(code, lino);
697!
1745
  SRpcMsg rsp = {
697✔
1746
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1747
  tmsgSendRsp(&rsp);
697✔
1748

1749
  return code;
697✔
1750
}
1751

1752
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
196✔
1753
  int32_t              code = 0;
196✔
1754
  int32_t              lino = 0;
196✔
1755
  void*                buf = NULL;
196✔
1756
  size_t               size = 0;
196✔
1757
  SStreamMsgVTableInfo vTableInfo = {0};
196✔
1758
  SMetaReader          metaReader = {0};
196✔
1759
  SNodeList*           groupNew = NULL;
196✔
1760
  void*                pTableList = NULL;
196✔
1761
  SStorageAPI api = {0};
196✔
1762
  initStorageAPI(&api);
196✔
1763

1764
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
196!
1765
  void* pTask = sStreamReaderInfo->pTask;
196✔
1766
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
196✔
1767

1768
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
196!
1769
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(
196!
1770
      pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType,
1771
      groupNew, true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api,
1772
      &pTableList, sStreamReaderInfo->groupIdMap));
1773

1774
  SArray* cids = req->virTableInfoReq.cids;
196✔
1775
  STREAM_CHECK_NULL_GOTO(cids, terrno);
196!
1776

1777
  SArray* pTableListArray = qStreamGetTableArrayList(pTableList);
196✔
1778
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
196!
1779

1780
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
196✔
1781
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
196!
1782
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
196✔
1783

1784
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
570✔
1785
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
374✔
1786
    if (pKeyInfo == NULL) {
374!
1787
      continue;
×
1788
    }
1789
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
374✔
1790
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
374!
1791
    vTable->uid = pKeyInfo->uid;
374✔
1792
    vTable->gId = pKeyInfo->groupId;
374✔
1793

1794
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
374✔
1795
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
374!
1796
      vTable->cols.nCols = metaReader.me.colRef.nCols;
×
1797
      vTable->cols.version = metaReader.me.colRef.version;
×
1798
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
×
1799
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
×
1800
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
×
1801
      }
1802
    } else {
1803
      vTable->cols.nCols = taosArrayGetSize(cids);
374✔
1804
      vTable->cols.version = metaReader.me.colRef.version;
374✔
1805
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
374!
1806
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
2,215✔
1807
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
8,751✔
1808
          if (metaReader.me.colRef.pColRef[j].hasRef &&
8,325✔
1809
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
6,331✔
1810
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
1,415✔
1811
            break;
1,415✔
1812
          }
1813
        }
1814
      }
1815
    }
1816
    tDecoderClear(&metaReader.coder);
374✔
1817
  }
1818
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
196✔
1819
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
196!
1820

1821
end:
196✔
1822
  nodesDestroyList(groupNew);
196✔
1823
  qStreamDestroyTableList(pTableList);
196✔
1824
  tDestroySStreamMsgVTableInfo(&vTableInfo);
196✔
1825
  api.metaReaderFn.clearReader(&metaReader);
196✔
1826
  STREAM_PRINT_LOG_END_WITHID(code, lino);
196!
1827
  SRpcMsg rsp = {
196✔
1828
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1829
  tmsgSendRsp(&rsp);
196✔
1830
  return code;
196✔
1831
}
1832

1833
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
69✔
1834
  int32_t                   code = 0;
69✔
1835
  int32_t                   lino = 0;
69✔
1836
  void*                     buf = NULL;
69✔
1837
  size_t                    size = 0;
69✔
1838
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
69✔
1839
  SMetaReader               metaReader = {0};
69✔
1840
  int64_t streamId = req->base.streamId;
69✔
1841
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
69✔
1842

1843
  SStorageAPI api = {0};
69✔
1844
  initStorageAPI(&api);
69✔
1845

1846
  SArray* cols = req->origTableInfoReq.cols;
69✔
1847
  STREAM_CHECK_NULL_GOTO(cols, terrno);
69!
1848

1849
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
69✔
1850

1851
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
69!
1852

1853
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
69✔
1854
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
552✔
1855
    OTableInfo*    oInfo = taosArrayGet(cols, i);
483✔
1856
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
483✔
1857
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
483!
1858
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
483!
1859
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
483!
1860
    vTableInfo->uid = metaReader.me.uid;
481✔
1861
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
481✔
1862

1863
    SSchemaWrapper* sSchemaWrapper = NULL;
482✔
1864
    if (metaReader.me.type == TD_CHILD_TABLE) {
482✔
1865
      int64_t suid = metaReader.me.ctbEntry.suid;
466✔
1866
      vTableInfo->suid = suid;
466✔
1867
      tDecoderClear(&metaReader.coder);
466✔
1868
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
467!
1869
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
467✔
1870
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
16!
1871
      vTableInfo->suid = 0;
16✔
1872
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
16✔
1873
    } else {
1874
      stError("invalid table type:%d", metaReader.me.type);
×
1875
    }
1876

1877
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
2,139!
1878
      SSchema* s = sSchemaWrapper->pSchema + j;
2,139✔
1879
      if (strcmp(s->name, oInfo->refColName) == 0) {
2,139✔
1880
        vTableInfo->cid = s->colId;
483✔
1881
        break;
483✔
1882
      }
1883
    }
1884
    if (vTableInfo->cid == 0) {
483!
1885
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
1886
              oInfo->refTableName);
1887
    }
1888
    tDecoderClear(&metaReader.coder);
483✔
1889
  }
1890

1891
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
69!
1892

1893
end:
69✔
1894
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
69✔
1895
  api.metaReaderFn.clearReader(&metaReader);
69✔
1896
  STREAM_PRINT_LOG_END(code, lino);
69!
1897
  SRpcMsg rsp = {
69✔
1898
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
1899
  tmsgSendRsp(&rsp);
69✔
1900
  return code;
69✔
1901
}
1902

1903
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
1,273✔
1904
  int32_t                   code = 0;
1,273✔
1905
  int32_t                   lino = 0;
1,273✔
1906
  void*                     buf = NULL;
1,273✔
1907
  size_t                    size = 0;
1,273✔
1908
  SSDataBlock* pBlock = NULL;
1,273✔
1909

1910
  SMetaReader               metaReader = {0};
1,273✔
1911
  SMetaReader               metaReaderStable = {0};
1,273✔
1912
  int64_t streamId = req->base.streamId;
1,273✔
1913
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
1,273✔
1914

1915
  SStorageAPI api = {0};
1,273✔
1916
  initStorageAPI(&api);
1,273✔
1917

1918
  SArray* cols = req->virTablePseudoColReq.cids;
1,274✔
1919
  STREAM_CHECK_NULL_GOTO(cols, terrno);
1,274!
1920

1921
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
1,274✔
1922
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
1,274!
1923

1924
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
1,273!
1925

1926
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
1,273!
1927
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
1,274✔
1928
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 && *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
15!
1929
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
15✔
1930
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
15!
1931
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
15!
1932
    pBlock->info.rows = 1;
15✔
1933
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
15✔
1934
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
15!
1935
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
15!
1936
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
1,259✔
1937
    int64_t suid = metaReader.me.ctbEntry.suid;
1,258✔
1938
    api.metaReaderFn.readerReleaseLock(&metaReader);
1,258✔
1939
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
1,259✔
1940

1941
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
1,259!
1942
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
1,258✔
1943
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
8,849✔
1944
      col_id_t* id = taosArrayGet(cols, i);
7,591✔
1945
      STREAM_CHECK_NULL_GOTO(id, terrno);
7,591!
1946
      if (*id == -1) {
7,591✔
1947
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
1,259✔
1948
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
1,259!
1949
        continue;
1,259✔
1950
      }
1951
      size_t j = 0;
6,332✔
1952
      for (; j < sSchemaWrapper->nCols; j++) {
26,433!
1953
        SSchema* s = sSchemaWrapper->pSchema + j;
26,434✔
1954
        if (s->colId == *id) {
26,434✔
1955
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
6,333✔
1956
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
6,333!
1957
          break;
6,333✔
1958
        }
1959
      }
1960
      if (j == sSchemaWrapper->nCols) {
6,332!
1961
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
1962
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
1963
      }
1964
    }
1965
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
1,259!
1966
    pBlock->info.rows = 1;
1,259✔
1967
    
1968
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
8,851✔
1969
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
7,592✔
1970
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
7,592!
1971

1972
      if (pDst->info.colId == -1) {
7,592✔
1973
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
1,259!
1974
        continue;
1,259✔
1975
      }
1976
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
6,333!
1977
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
1978
        continue;
×
1979
      }
1980

1981
      STagVal val = {0};
6,333✔
1982
      val.cid = pDst->info.colId;
6,333✔
1983
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
6,333✔
1984

1985
      char* data = NULL;
6,332✔
1986
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
6,332!
1987
        data = tTagValToData((const STagVal*)p, false);
5,620✔
1988
      } else {
1989
        data = (char*)p;
712✔
1990
      }
1991

1992
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
6,332!
1993
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
1994

1995
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
6,333!
1996
          (data != NULL)) {
1997
        taosMemoryFree(data);
4,308!
1998
      }
1999
    }
2000
  } else {
2001
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
1!
2002
    code = TSDB_CODE_INVALID_PARA;
×
2003
    goto end;
×
2004
  }
2005
  
2006
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
1,274✔
2007
  printDataBlock(pBlock, __func__, "");
1,274✔
2008
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
1,273!
2009

2010
end:
1,273✔
2011
  api.metaReaderFn.clearReader(&metaReaderStable);
1,273✔
2012
  api.metaReaderFn.clearReader(&metaReader);
1,274✔
2013
  STREAM_PRINT_LOG_END(code, lino);
1,274!
2014
  SRpcMsg rsp = {
1,274✔
2015
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2016
  tmsgSendRsp(&rsp);
1,274✔
2017
  blockDataDestroy(pBlock);
1,274✔
2018
  return code;
1,274✔
2019
}
2020

2021
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
14,536✔
2022
  int32_t            code = 0;
14,536✔
2023
  int32_t            lino = 0;
14,536✔
2024
  void*              buf = NULL;
14,536✔
2025
  size_t             size = 0;
14,536✔
2026
  void*              taskAddr = NULL;
14,536✔
2027
  SArray*            pResList = NULL;
14,536✔
2028

2029
  SResFetchReq req = {0};
14,536✔
2030
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
14,536!
2031
                              TSDB_CODE_QRY_INVALID_INPUT);
2032
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
14,536✔
2033
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
14,536!
2034

2035
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
14,536!
2036
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
14,536✔
2037
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
14,536!
2038
  void* pTask = sStreamReaderCalcInfo->pTask;
14,536✔
2039
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
14,536✔
2040
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
2041

2042
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
14,536!
2043
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
14,452✔
2044
    int64_t uid = 0;
14,452✔
2045
    if (req.dynTbname) {
14,452✔
2046
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
6✔
2047
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
6!
2048
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
6✔
2049
        if (pValue != NULL && pValue->isTbname) {
6!
2050
          uid = pValue->uid;
6✔
2051
          break;
6✔
2052
        }
2053
      }
2054
    }
2055
    
2056
    SReadHandle handle = {0};
14,452✔
2057
    handle.vnode = pVnode;
14,452✔
2058
    handle.uid = uid;
14,452✔
2059

2060
    initStorageAPI(&handle.api);
14,452✔
2061
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
14,452✔
2062
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
14,321✔
2063
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
14,381✔
2064
      if (node != NULL) {
14,381✔
2065
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle));
14,362!
2066
      }
2067
    }
2068

2069
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
14,452✔
2070
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
14,452✔
2071

2072
    // if (sStreamReaderCalcInfo->pTaskInfo == NULL) {
2073
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
14,452✔
2074
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
2075
                                                    req.taskId));
2076
    // } else {
2077
    // STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, handle.winRange));
2078
    // }
2079

2080
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
14,446!
2081
  }
2082

2083
  if (req.pOpParam != NULL) {
14,530✔
2084
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
48✔
2085
  }
2086
  
2087
  pResList = taosArrayInit(4, POINTER_BYTES);
14,530✔
2088
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
14,530!
2089
  uint64_t ts = 0;
14,530✔
2090
  bool     hasNext = false;
14,530✔
2091
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
14,530✔
2092

2093
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
23,097✔
2094
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
8,577✔
2095
    if (pBlock == NULL) continue;
8,577!
2096
    printDataBlock(pBlock, __func__, "fetch");
8,577✔
2097
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
8,577✔
2098
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo));
348!
2099
      printDataBlock(pBlock, __func__, "fetch filter");
348✔
2100
    }
2101
  }
2102

2103
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
14,520!
2104
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
14,520✔
2105

2106
end:
159✔
2107
  taosArrayDestroy(pResList);
14,536✔
2108
  streamReleaseTask(taskAddr);
14,536✔
2109

2110
  STREAM_PRINT_LOG_END(code, lino);
14,536!
2111
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
14,536✔
2112
  tmsgSendRsp(&rsp);
14,536✔
2113
  tDestroySResFetchReq(&req);
14,536✔
2114
  return code;
14,536✔
2115
}
2116

2117
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
64,063✔
2118
  int32_t                   code = 0;
64,063✔
2119
  int32_t                   lino = 0;
64,063✔
2120
  SSTriggerPullRequestUnion req = {0};
64,063✔
2121
  void*                     taskAddr = NULL;
64,063✔
2122

2123
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
64,063✔
2124
  if (!syncIsReadyForRead(pVnode->sync)) {
64,063✔
2125
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
194✔
2126
    return 0;
193✔
2127
  }
2128

2129
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
63,885✔
2130
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
14,536✔
2131
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
49,349!
2132
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
49,353✔
2133
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
49,353✔
2134
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
49,353!
2135
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
49,340✔
2136
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
2137
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
49,344✔
2138
    switch (req.base.type) {
49,338!
2139
      case STRIGGER_PULL_SET_TABLE:
69✔
2140
        code = vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo);
69✔
2141
        break;
69✔
2142
      case STRIGGER_PULL_LAST_TS:
355✔
2143
        code = vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
355✔
2144
        break;
353✔
2145
      case STRIGGER_PULL_FIRST_TS:
337✔
2146
        code = vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo);
337✔
2147
        break;
336✔
2148
      case STRIGGER_PULL_TSDB_META:
602✔
2149
      case STRIGGER_PULL_TSDB_META_NEXT:
2150
        code = vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
602✔
2151
        break;
602✔
2152
      case STRIGGER_PULL_TSDB_TS_DATA:
37✔
2153
        code = vnodeProcessStreamTsdbTsDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
37✔
2154
        break;
37✔
2155
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
345✔
2156
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
2157
        code = vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
345✔
2158
        break;
345✔
2159
      case STRIGGER_PULL_TSDB_CALC_DATA:
28,346✔
2160
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
2161
        code = vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
28,346✔
2162
        break;
28,345✔
2163
      case STRIGGER_PULL_TSDB_DATA:
427✔
2164
      case STRIGGER_PULL_TSDB_DATA_NEXT:
2165
        code = vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
427✔
2166
        break;
427✔
2167
      case STRIGGER_PULL_WAL_META:
7,888✔
2168
        code = vnodeProcessStreamWalMetaReq(pVnode, pMsg, &req, sStreamReaderInfo);
7,888✔
2169
        break;
7,909✔
2170
      case STRIGGER_PULL_WAL_TS_DATA:
8,697✔
2171
      case STRIGGER_PULL_WAL_TRIGGER_DATA:
2172
      case STRIGGER_PULL_WAL_CALC_DATA:
2173
      case STRIGGER_PULL_WAL_DATA:
2174
        code = vnodeProcessStreamWalDataReq(pVnode, pMsg, &req, sStreamReaderInfo);
8,697✔
2175
        break;
8,693✔
2176
      case STRIGGER_PULL_GROUP_COL_VALUE:
697✔
2177
        code = vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo);
697✔
2178
        break;
697✔
2179
      case STRIGGER_PULL_VTABLE_INFO:
196✔
2180
        code = vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo);
196✔
2181
        break;
196✔
2182
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
1,273✔
2183
        code = vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req);
1,273✔
2184
        break;
1,274✔
2185
      case STRIGGER_PULL_OTABLE_INFO:
69✔
2186
        code = vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req);
69✔
2187
        break;
69✔
2188
      default:
×
2189
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
2190
        code = TSDB_CODE_APP_ERROR;
×
2191
        break;
×
2192
    }
2193
  } else {
2194
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
2195
    code = TSDB_CODE_APP_ERROR;
×
2196
  }
2197
end:
49,352✔
2198

2199
  streamReleaseTask(taskAddr);
49,352✔
2200

2201
  tDestroySTriggerPullRequest(&req);
49,348✔
2202
  STREAM_PRINT_LOG_END(code, lino);
49,356!
2203
  return code;
49,344✔
2204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc