• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4821

22 Oct 2025 06:02AM UTC coverage: 61.242% (-0.1%) from 61.353%
#4821

push

travis-ci

web-flow
Merge pull request #33334 from taosdata/3.0

fix(stream): reset tableScan operator (#33225)

156089 of 324573 branches covered (48.09%)

Branch coverage included in aggregate %.

184 of 244 new or added lines in 9 files covered. (75.41%)

789 existing lines in 124 files now uncovered.

207891 of 269759 relevant lines covered (77.07%)

244003752.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.94
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdbool.h>
17
#include <stdint.h>
18
#include "nodes.h"
19
#include "osMemPool.h"
20
#include "osMemory.h"
21
#include "scalar.h"
22
#include "streamReader.h"
23
#include "taosdef.h"
24
#include "tarray.h"
25
#include "tcommon.h"
26
#include "tdatablock.h"
27
#include "tdb.h"
28
#include "tdef.h"
29
#include "tencode.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "tlist.h"
33
#include "tmsg.h"
34
#include "tsimplehash.h"
35
#include "vnd.h"
36
#include "vnode.h"
37
#include "vnodeInt.h"
38
#include "executor.h"
39

40
#define BUILD_OPTION(options, sStreamReaderInfo, _ver, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
41
                     _gid, _initReader, _mapInfo)                                                                        \
42
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_mapInfo == NULL ? sStreamReaderInfo->suid : 0),              \
43
                                                  .ver = _ver,                                                     \
44
                                                  .order = _order,                                                 \
45
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
46
                                                  .sStreamReaderInfo = sStreamReaderInfo,                     \
47
                                                  .schemas = _schemas,                                             \
48
                                                  .isSchema = _isSchema,                                           \
49
                                                  .scanMode = _scanMode,                                           \
50
                                                  .gid = _gid,                                                     \
51
                                                  .initReader = _initReader,                                       \
52
                                                  .mapInfo = _mapInfo};
53

54
typedef struct WalMetaResult {
55
  uint64_t    id;
56
  int64_t     skey;
57
  int64_t     ekey;
58
} WalMetaResult;
59

60
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
39,575,845✔
61

62
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
63

64
int32_t sortCid(const void *lp, const void *rp) {
2,696,516✔
65
  int16_t* c1 = (int16_t*)lp;
2,696,516✔
66
  int16_t* c2 = (int16_t*)rp;
2,696,516✔
67

68
  if (*c1 < *c2) {
2,696,516✔
69
    return -1;
2,678,554✔
70
  } else if (*c1 > *c2) {
17,962!
71
    return 1;
17,962✔
72
  }
73

74
  return 0;
×
75
}
76

77
int32_t sortSSchema(const void *lp, const void *rp) {
2,686,364✔
78
  SSchema* c1 = (SSchema*)lp;
2,686,364✔
79
  SSchema* c2 = (SSchema*)rp;
2,686,364✔
80

81
  if (c1->colId < c2->colId) {
2,686,364✔
82
    return -1;
2,668,402✔
83
  } else if (c1->colId > c2->colId) {
17,962!
84
    return 1;
17,962✔
85
  }
86

87
  return 0;
×
88
}
89

90
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
83,255,198✔
91
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
83,255,198✔
92
  if (pSrc == NULL) {
83,272,424!
93
    return terrno;
×
94
  }
95

96
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
83,272,424!
97
  return 0;
83,252,663✔
98
}
99

100
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
49,282,587✔
101
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
49,282,587✔
102
  if (code != TSDB_CODE_SUCCESS) {
49,285,196!
103
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
104
  }
105

106
  return code;
49,287,824✔
107
}
108

109
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
4,563,968✔
110
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
4,563,968✔
111
}
112

113
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
253,448✔
114
  int32_t code = 0;
253,448✔
115
  int32_t lino = 0;
253,448✔
116
  void*   buf = NULL;
253,448✔
117
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
253,448✔
118
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
253,448!
119
  buf = rpcMallocCont(len);
253,448✔
120
  STREAM_CHECK_NULL_GOTO(buf, terrno);
253,448!
121
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
253,448✔
122
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
253,448!
123
  *data = buf;
253,448✔
124
  *size = len;
253,448✔
125
  buf = NULL;
253,448✔
126
end:
253,448✔
127
  rpcFreeCont(buf);
253,448✔
128
  return code;
253,448✔
129
}
130

131
static bool needRefreshTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int8_t tableType, int64_t suid, int64_t uid, bool isCalc){
9,028,639✔
132
  if (sStreamReaderInfo->isVtableStream) {
9,028,639!
133
    int64_t id[2] = {suid, uid};
6,991,737✔
134
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id)) == NULL) {
6,993,073!
135
      return true;
6,983,667✔
136
    }
137
  } else {
138
    if (tableType != TD_CHILD_TABLE) {
2,035,566✔
139
      return false;
688,264✔
140
    }
141
    if (sStreamReaderInfo->tableType == TD_SUPER_TABLE && 
1,347,302✔
142
        suid == sStreamReaderInfo->suid && 
803,894✔
143
        qStreamGetGroupId(sStreamReaderInfo->tableList, uid) == -1) {
30,788✔
144
      return true;
12,042✔
145
    }
146
  }
147
  return false;
1,345,935✔
148
}
149

150
static bool uidInTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id, bool isCalc){
116,231,471✔
151
  if (sStreamReaderInfo->isVtableStream) {
116,231,471!
152
    int64_t tmp[2] = {suid, uid};
80,132,925✔
153
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, tmp, sizeof(tmp)) == NULL) {
80,138,135✔
154
      return false;
40,818,882✔
155
    }
156
    *id = uid;
39,316,367✔
157
  } else {
158
    if (sStreamReaderInfo->tableList == NULL) return false;
36,185,519!
159

160
    if (sStreamReaderInfo->tableType == TD_SUPER_TABLE) {
36,220,411✔
161
      if (suid != sStreamReaderInfo->suid) return false;
22,547,136✔
162
      if (sStreamReaderInfo->pTagCond == NULL) {
17,514,888✔
163
        if (sStreamReaderInfo->partitionCols == NULL){
15,276,541✔
164
          *id = 0;
55,186✔
165
        } else if (sStreamReaderInfo->groupByTbname){
15,222,686!
166
          *id= uid;
14,297,354✔
167
        } else {
168
          *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
926,668✔
169
          if (*id == -1) return false;
925,332!
170
        }
171
      } else {
172
        //*id= uid;
173
        *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
2,232,991✔
174
        if (*id == -1) return false;
2,235,675✔
175
      }
176
    } else {
177
      *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
13,689,308✔
178
      if(*id == -1) *id = uid;
13,674,624✔
179
      return uid == sStreamReaderInfo->uid;
13,681,304✔
180
    }
181
  }
182
  return true;
55,901,253✔
183
}
184

185
static int32_t generateTablistForStreamReader(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, bool isHistory) {
15,554,016✔
186
  int32_t                   code = 0;
15,554,016✔
187
  int32_t                   lino = 0;
15,554,016✔
188
  SNodeList* groupNew = NULL;                                      
15,554,016✔
189
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
15,557,958!
190

191
  SStorageAPI api = {0};
15,557,958✔
192
  initStorageAPI(&api);
15,557,958✔
193
  code = qStreamCreateTableListForReader(pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType, groupNew,
15,555,373✔
194
                                         true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api, 
195
                                         isHistory ? &sStreamReaderInfo->historyTableList : &sStreamReaderInfo->tableList,
196
                                         isHistory ? NULL : sStreamReaderInfo->groupIdMap);
197
  end:
15,556,689✔
198
  nodesDestroyList(groupNew);
15,556,689✔
199
  STREAM_PRINT_LOG_END(code, lino);
15,557,958!
200
  return code;
15,557,958✔
201
}
202

203
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
982,232✔
204
  int32_t code = 0;
982,232✔
205
  int32_t lino = 0;
982,232✔
206
  void*   buf = NULL;
982,232✔
207
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
982,232✔
208
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
982,232!
209
  buf = rpcMallocCont(len);
982,232✔
210
  STREAM_CHECK_NULL_GOTO(buf, terrno);
982,232!
211
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
982,232✔
212
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
980,914!
213
  *data = buf;
980,914✔
214
  *size = len;
980,914✔
215
  buf = NULL;
980,914✔
216
end:
980,914✔
217
  rpcFreeCont(buf);
980,914✔
218
  return code;
982,232✔
219
}
220

221
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
1,357,778✔
222
  int32_t code = 0;
1,357,778✔
223
  int32_t lino = 0;
1,357,778✔
224
  void*   buf = NULL;
1,357,778✔
225
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
1,357,778✔
226
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
1,357,778!
227
  buf = rpcMallocCont(len);
1,357,778✔
228
  STREAM_CHECK_NULL_GOTO(buf, terrno);
1,359,114!
229
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
1,359,114✔
230
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
1,357,778!
231
  *data = buf;
1,357,778✔
232
  *size = len;
1,359,114✔
233
  buf = NULL;
1,359,114✔
234
end:
1,359,114✔
235
  rpcFreeCont(buf);
1,359,114✔
236
  return code;
1,357,778✔
237
}
238

239

240
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
41,539,953✔
241
  int32_t code = 0;
41,539,953✔
242
  int32_t lino = 0;
41,539,953✔
243
  void*   buf = NULL;
41,539,953✔
244
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
41,539,953!
245
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
4,481,501✔
246
  buf = rpcMallocCont(dataEncodeSize);
4,481,501✔
247
  STREAM_CHECK_NULL_GOTO(buf, terrno);
4,480,197!
248
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
4,480,197✔
249
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
4,480,164!
250
  *data = buf;
4,480,164✔
251
  *size = dataEncodeSize;
4,480,164✔
252
  buf = NULL;
4,480,164✔
253
end:
41,541,256✔
254
  rpcFreeCont(buf);
41,541,256✔
255
  return code;
41,543,896✔
256
}
257

258
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
3,089,203✔
259
  int32_t        pNum = 1;
3,089,203✔
260
  STableKeyInfo* pList = NULL;
3,089,203✔
261
  int32_t        code = 0;
3,089,203✔
262
  int32_t        lino = 0;
3,089,203✔
263
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
3,089,203!
264
  if (pList == NULL || pNum == 0) {
3,086,616!
265
    code = TSDB_CODE_INVALID_PARA;
×
266
    goto end;
×
267
  }
268
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
3,086,616!
269

270
  cleanupQueryTableDataCond(&pTask->cond);
3,089,203✔
271
  uint64_t suid = pTask->options.sStreamReaderInfo->isVtableStream ? pList->groupId : pTask->options.suid;
3,089,203!
272
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
3,089,203!
273
                                                      pTask->options.twindows, suid, pTask->options.ver, NULL));
274
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
3,089,203!
275

276
end:
3,089,203✔
277
  STREAM_PRINT_LOG_END(code, lino);
3,089,203!
278
  return code;
3,089,203✔
279
}
280

281
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t id, bool isVTable, int64_t uid,
×
282
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
283
  int32_t code = 0;
×
284
  int32_t lino = 0;
×
285
  int32_t index = 0;
×
286
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
×
287
  if (!isVTable) {
×
288
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
×
289
  }
290
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
×
291
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
×
292
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
×
293
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
×
294
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
×
295

296
end:
×
297
  // STREAM_PRINT_LOG_END(code, lino)
298
  return code;
×
299
}
300

301
static int32_t buildWalMetaBlockNew(SSDataBlock* pBlock, int64_t id, int64_t skey, int64_t ekey, int64_t ver) {
20,284,334✔
302
  int32_t code = 0;
20,284,334✔
303
  int32_t lino = 0;
20,284,334✔
304
  int32_t index = 0;
20,284,334✔
305
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
20,284,334!
306
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
20,282,994!
307
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
20,280,340!
308
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
20,279,048!
309

310
end:
20,280,387✔
311
  return code;
20,280,387✔
312
}
313

314
static int32_t buildDropTableBlock(SSDataBlock* pBlock, int64_t id, int64_t ver) {
1,356✔
315
  int32_t code = 0;
1,356✔
316
  int32_t lino = 0;
1,356✔
317
  int32_t index = 0;
1,356✔
318
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
1,356!
319
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
1,356!
320

321
end:
1,356✔
322
  return code;
1,356✔
323
}
324

325
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
×
326
  pTSchema->numOfCols = 1;
×
327
  pTSchema->version = ver;
×
328
  pTSchema->columns[0].colId = colId;
×
329
  pTSchema->columns[0].type = type;
×
330
  pTSchema->columns[0].bytes = bytes;
×
331
}
×
332

333
static int32_t scanDeleteDataNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
143,844✔
334
                              int64_t ver) {
335
  int32_t    code = 0;
143,844✔
336
  int32_t    lino = 0;
143,844✔
337
  SDecoder   decoder = {0};
143,844✔
338
  SDeleteRes req = {0};
143,844✔
339
  void* pTask = sStreamReaderInfo->pTask;
143,844✔
340

341
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
143,844✔
342
  tDecoderInit(&decoder, data, len);
143,844✔
343
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
143,844!
344
  STREAM_CHECK_CONDITION_GOTO((sStreamReaderInfo->tableType == TSDB_SUPER_TABLE && !sStreamReaderInfo->isVtableStream && req.suid != sStreamReaderInfo->suid), TDB_CODE_SUCCESS);
143,844!
345
  
346
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
215,766✔
347
    uint64_t* uid = taosArrayGet(req.uidList, i);
123,879✔
348
    STREAM_CHECK_NULL_GOTO(uid, terrno);
123,879!
349
    uint64_t   id = 0;
123,879✔
350
    ST_TASK_ILOG("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, *uid, req.skey, req.ekey);
123,879!
351
    STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, req.suid, *uid, &id, false), TDB_CODE_SUCCESS);
123,879✔
352
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->deleteBlock, ((SSDataBlock*)rsp->deleteBlock)->info.rows + 1));
91,887!
353
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->deleteBlock, id, req.skey, req.ekey, ver));
91,887!
354
    ((SSDataBlock*)rsp->deleteBlock)->info.rows++;
91,887✔
355
    rsp->totalRows++;
91,887✔
356
  }
357

358
end:
143,844✔
359
  taosArrayDestroy(req.uidList);
143,844✔
360
  tDecoderClear(&decoder);
142,508✔
361
  return code;
143,844✔
362
}
363

364
static int32_t scanDropTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
1,356✔
365
                             int64_t ver) {
366
  int32_t  code = 0;
1,356✔
367
  int32_t  lino = 0;
1,356✔
368
  SDecoder decoder = {0};
1,356✔
369
  void* pTask = sStreamReaderInfo->pTask;
1,356✔
370

371
  SVDropTbBatchReq req = {0};
1,356✔
372
  tDecoderInit(&decoder, data, len);
1,356✔
373
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
1,356!
374

375
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
2,712✔
376
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
1,356✔
377
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
1,356!
378
    uint64_t id = 0;
1,356✔
379
    if(!uidInTableList(sStreamReaderInfo, pDropTbReq->suid, pDropTbReq->uid, &id, false)) {
1,356!
380
      continue;
×
381
    }
382

383
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dropBlock, ((SSDataBlock*)rsp->dropBlock)->info.rows + 1));
1,356!
384
    STREAM_CHECK_RET_GOTO(buildDropTableBlock(rsp->dropBlock, id, ver));
1,356!
385
    ((SSDataBlock*)rsp->dropBlock)->info.rows++;
1,356✔
386
    rsp->totalRows++;
1,356✔
387
    ST_TASK_ILOG("stream reader scan drop uid %" PRId64 ", id %" PRIu64, pDropTbReq->uid, id);
1,356!
388
  }
389

390
end:
1,356✔
391
  tDecoderClear(&decoder);
1,356✔
392
  return code;
1,356✔
393
}
394

395
static int32_t reloadTableList(SStreamTriggerReaderInfo* sStreamReaderInfo){
7,023,847✔
396
  (void)taosThreadMutexLock(&sStreamReaderInfo->mutex);
7,023,847✔
397
  qStreamDestroyTableList(sStreamReaderInfo->tableList);
7,023,847✔
398
  sStreamReaderInfo->tableList = NULL;
7,023,847✔
399
  int32_t code = generateTablistForStreamReader(sStreamReaderInfo->pVnode, sStreamReaderInfo, false);
7,023,847✔
400
  if (code == 0){
7,023,847!
401
    qStreamDestroyTableList(sStreamReaderInfo->historyTableList);
7,023,847✔
402
    sStreamReaderInfo->historyTableList = NULL;
7,023,847✔
403
    code = generateTablistForStreamReader(sStreamReaderInfo->pVnode, sStreamReaderInfo, true);
7,023,847✔
404
  }
405
  (void)taosThreadMutexUnlock(&sStreamReaderInfo->mutex);
7,023,847✔
406
  return code;
7,023,847✔
407
}
408

409
static int32_t scanCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
133,165✔
410
  int32_t  code = 0;
133,165✔
411
  int32_t  lino = 0;
133,165✔
412
  SDecoder decoder = {0};
133,165✔
413
  void* pTask = sStreamReaderInfo->pTask;
133,165✔
414

415
  SVCreateTbBatchReq req = {0};
133,165✔
416
  tDecoderInit(&decoder, data, len);
133,165✔
417
  
418
  STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbBatchReq(&decoder, &req));
133,165!
419

420
  bool found = false;
133,165✔
421
  SVCreateTbReq* pCreateReq = NULL;
133,165✔
422
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
201,179✔
423
    pCreateReq = req.pReqs + iReq;
133,165✔
424
    if (!needRefreshTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
133,165✔
425
      ST_TASK_ILOG("stream reader scan create table jump, %s", pCreateReq->name);
68,014!
426
      continue;
68,014✔
427
    }
428
    ST_TASK_ILOG("stream reader scan create table %s", pCreateReq->name);
65,151!
429

430
    found = true;
65,151✔
431
    break;
65,151✔
432
  }
433
  STREAM_CHECK_CONDITION_GOTO(!found, TDB_CODE_SUCCESS);
133,165✔
434

435
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
65,151!
436
end:
133,165✔
437
  tDeleteSVCreateTbBatchReq(&req);
133,165✔
438
  tDecoderClear(&decoder);
133,165✔
439
  return code;
133,165✔
440
}
441

442
static int32_t processAutoCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SVCreateTbReq* pCreateReq) {
8,894,060✔
443
  int32_t  code = 0;
8,894,060✔
444
  int32_t  lino = 0;
8,894,060✔
445
  void*    pTask = sStreamReaderInfo->pTask;
8,894,060✔
446
  if (!needRefreshTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
8,895,474✔
447
    ST_TASK_DLOG("stream reader scan auto create table jump, %s", pCreateReq->name);
1,963,435✔
448
    goto end;
1,966,185✔
449
  }
450
  ST_TASK_ILOG("stream reader scan auto create table %s", pCreateReq->name);
6,930,558✔
451

452
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
6,931,827!
453
end:
6,930,558✔
454
  return code;
8,896,743✔
455
}
456

457
static int32_t scanAlterTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
165,569✔
458
  int32_t  code = 0;
165,569✔
459
  int32_t  lino = 0;
165,569✔
460
  SDecoder decoder = {0};
165,569✔
461
  void* pTask = sStreamReaderInfo->pTask;
165,569✔
462

463
  SVAlterTbReq req = {0};
165,569✔
464
  tDecoderInit(&decoder, data, len);
165,569✔
465
  
466
  STREAM_CHECK_RET_GOTO(tDecodeSVAlterTbReq(&decoder, &req));
165,569!
467
  STREAM_CHECK_CONDITION_GOTO(req.action != TSDB_ALTER_TABLE_UPDATE_TAG_VAL && req.action != TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL, TDB_CODE_SUCCESS);
165,569!
468

469
  ETableType tbType = 0;
45,272✔
470
  uint64_t suid = 0;
45,272✔
471
  STREAM_CHECK_RET_GOTO(metaGetTableTypeSuidByName(sStreamReaderInfo->pVnode, req.tbName, &tbType, &suid));
45,272!
472
  STREAM_CHECK_CONDITION_GOTO(tbType != TSDB_CHILD_TABLE, TDB_CODE_SUCCESS);
45,272!
473
  STREAM_CHECK_CONDITION_GOTO(suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
45,272✔
474

475
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
28,138!
476
  ST_TASK_ILOG("stream reader scan alter table %s", req.tbName);
28,138!
477

478
end:
165,569✔
479
  taosArrayDestroy(req.pMultiTag);
165,569✔
480
  tDecoderClear(&decoder);
165,569✔
481
  return code;
165,569✔
482
}
483

484
// static int32_t scanAlterSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
485
//   int32_t  code = 0;
486
//   int32_t  lino = 0;
487
//   SDecoder decoder = {0};
488
//   SMAlterStbReq reqAlter = {0};
489
//   SVCreateStbReq req = {0};
490
//   tDecoderInit(&decoder, data, len);
491
//   void* pTask = sStreamReaderInfo->pTask;
492
  
493
//   STREAM_CHECK_RET_GOTO(tDecodeSVCreateStbReq(&decoder, &req));
494
//   STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
495
//   if (req.alterOriData != 0) {
496
//     STREAM_CHECK_RET_GOTO(tDeserializeSMAlterStbReq(req.alterOriData, req.alterOriDataLen, &reqAlter));
497
//     STREAM_CHECK_CONDITION_GOTO(reqAlter.alterType != TSDB_ALTER_TABLE_DROP_TAG && reqAlter.alterType != TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TDB_CODE_SUCCESS);
498
//   }
499
  
500
//   STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
501

502
//   ST_TASK_ILOG("stream reader scan alter suid %" PRId64, req.suid);
503
// end:
504
//   tFreeSMAltertbReq(&reqAlter);
505
//   tDecoderClear(&decoder);
506
//   return code;
507
// }
508

509
static int32_t scanDropSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
×
510
  int32_t  code = 0;
×
511
  int32_t  lino = 0;
×
512
  SDecoder decoder = {0};
×
513
  void* pTask = sStreamReaderInfo->pTask;
×
514

515
  SVDropStbReq req = {0};
×
516
  tDecoderInit(&decoder, data, len);
×
517
  STREAM_CHECK_RET_GOTO(tDecodeSVDropStbReq(&decoder, &req));
×
518
  STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
×
519
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
×
520

521
  ST_TASK_ILOG("stream reader scan drop suid %" PRId64, req.suid);
×
522
end:
×
523
  tDecoderClear(&decoder);
×
524
  return code;
×
525
}
526

527
static int32_t scanSubmitTbDataForMeta(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* gidHash) {
57,335,905✔
528
  int32_t code = 0;
57,335,905✔
529
  int32_t lino = 0;
57,335,905✔
530
  WalMetaResult walMeta = {0};
57,335,905✔
531
  SSubmitTbData submitTbData = {0};
57,338,598✔
532
  
533
  if (tStartDecode(pCoder) < 0) {
57,338,597!
534
    code = TSDB_CODE_INVALID_MSG;
×
535
    TSDB_CHECK_CODE(code, lino, end);
×
536
  }
537

538
  uint8_t       version = 0;
57,349,066✔
539
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
57,337,307!
540
    code = TSDB_CODE_INVALID_MSG;
×
541
    TSDB_CHECK_CODE(code, lino, end);
×
542
  }
543
  version = (submitTbData.flags >> 8) & 0xff;
57,337,307✔
544
  submitTbData.flags = submitTbData.flags & 0xff;
57,337,307✔
545

546
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
547
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
57,337,307✔
548
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
7,330,022!
549
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
7,330,022!
550
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
7,330,022!
551
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq));
7,328,753!
552
  }
553

554
  // submit data
555
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
57,336,269!
556
    code = TSDB_CODE_INVALID_MSG;
×
557
    TSDB_CHECK_CODE(code, lino, end);
×
558
  }
559
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
57,338,588!
560
    code = TSDB_CODE_INVALID_MSG;
×
561
    TSDB_CHECK_CODE(code, lino, end);
×
562
  }
563

564
  if (!uidInTableList(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &walMeta.id, false)){
57,338,588✔
565
    goto end;
42,644,286✔
566
  }
567
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
14,700,839!
568
    code = TSDB_CODE_INVALID_MSG;
×
569
    TSDB_CHECK_CODE(code, lino, end);
×
570
  }
571

572
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
14,700,839!
573
    uint64_t nColData = 0;
×
574
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
575
      code = TSDB_CODE_INVALID_MSG;
×
576
      TSDB_CHECK_CODE(code, lino, end);
×
577
    }
578

579
    SColData colData = {0};
×
580
    code = tDecodeColData(version, pCoder, &colData, false);
×
581
    if (code) {
×
582
      code = TSDB_CODE_INVALID_MSG;
×
583
      TSDB_CHECK_CODE(code, lino, end);
×
584
    }
585

586
    if (colData.flag != HAS_VALUE) {
×
587
      code = TSDB_CODE_INVALID_MSG;
×
588
      TSDB_CHECK_CODE(code, lino, end);
×
589
    }
590
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
591
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
592

593
    for (uint64_t i = 1; i < nColData; i++) {
×
594
      code = tDecodeColData(version, pCoder, &colData, true);
×
595
      if (code) {
×
596
        code = TSDB_CODE_INVALID_MSG;
×
597
        TSDB_CHECK_CODE(code, lino, end);
×
598
      }
599
    }
600
  } else {
601
    uint64_t nRow = 0;
14,700,839✔
602
    if (tDecodeU64v(pCoder, &nRow) < 0) {
14,700,839!
603
      code = TSDB_CODE_INVALID_MSG;
×
604
      TSDB_CHECK_CODE(code, lino, end);
×
605
    }
606

607
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
36,149,429✔
608
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
21,445,985✔
609
      pCoder->pos += pRow->len;
21,448,590✔
610
      if (iRow == 0){
21,447,254✔
611
#ifndef NO_UNALIGNED_ACCESS
612
        walMeta.skey = pRow->ts;
14,696,829✔
613
#else
614
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
615
#endif
616
      }
617
      if (iRow == nRow - 1) {
21,449,927✔
618
#ifndef NO_UNALIGNED_ACCESS
619
        walMeta.ekey = pRow->ts;
14,699,502✔
620
#else
621
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
622
#endif
623
      }
624
    }
625
  }
626

627
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
14,698,234✔
628
  if (data != NULL) {
14,699,502!
629
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
×
630
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
×
631
  } else {
632
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
14,699,502!
633
  }
634

635
end:
57,316,473✔
636
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
57,339,913✔
637
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
57,333,387!
638
  tEndDecode(pCoder);
57,333,387✔
639
  return code;
57,321,888✔
640
}
641

642
static int32_t scanSubmitDataForMeta(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len, int64_t ver) {
57,346,327✔
643
  int32_t  code = 0;
57,346,327✔
644
  int32_t  lino = 0;
57,346,327✔
645
  SDecoder decoder = {0};
57,346,327✔
646
  SSHashObj* gidHash = NULL;
57,347,663✔
647
  void* pTask = sStreamReaderInfo->pTask;
57,347,663✔
648

649
  tDecoderInit(&decoder, data, len);
57,347,663✔
650
  if (tStartDecode(&decoder) < 0) {
57,338,376!
651
    code = TSDB_CODE_INVALID_MSG;
×
652
    TSDB_CHECK_CODE(code, lino, end);
×
653
  }
654

655
  uint64_t nSubmitTbData = 0;
57,343,786✔
656
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
57,337,104!
657
    code = TSDB_CODE_INVALID_MSG;
×
658
    TSDB_CHECK_CODE(code, lino, end);
×
659
  }
660

661
  gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
57,337,104✔
662
  STREAM_CHECK_NULL_GOTO(gidHash, terrno);
57,333,501!
663

664
  for (int32_t i = 0; i < nSubmitTbData; i++) {
114,666,810✔
665
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataForMeta(&decoder, sStreamReaderInfo, gidHash));
57,332,232!
666
  }
667
  tEndDecode(&decoder);
57,334,578✔
668

669
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
57,328,233!
670
  int32_t iter = 0;
57,330,834✔
671
  void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
57,328,229✔
672
  while (px != NULL) {
72,040,887✔
673
    WalMetaResult* pMeta = (WalMetaResult*)px;
14,699,503✔
674
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
14,699,503!
675
    ((SSDataBlock*)rsp->metaBlock)->info.rows++;
14,698,301✔
676
    rsp->totalRows++;
14,698,301✔
677
    ST_TASK_DLOG("stream reader scan submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
14,698,301✔
678
          ", ver:%"PRId64, pMeta->skey, pMeta->ekey, pMeta->id, ver);
679
    px = tSimpleHashIterate(gidHash, px, &iter);
14,699,570✔
680
  }
681
end:
57,341,384✔
682
  tDecoderClear(&decoder);
57,338,846✔
683
  tSimpleHashCleanup( gidHash);
57,345,192✔
684
  return code;
57,340,183✔
685
}
686

687
static int32_t createBlockForTsdbMeta(SSDataBlock** pBlock, bool isVTable) {
1,274,774✔
688
  int32_t code = 0;
1,274,774✔
689
  int32_t lino = 0;
1,274,774✔
690
  SArray* schemas = taosArrayInit(8, sizeof(SSchema));
1,274,774✔
691
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,274,774!
692

693
  int32_t index = 1;
1,274,774✔
694
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
1,274,774!
695
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
1,274,774!
696
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
1,274,774!
697
  if (!isVTable) {
1,274,774✔
698
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
173,704!
699
  }
700
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
1,274,774!
701

702
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
1,274,774!
703

704
end:
1,274,774✔
705
  taosArrayDestroy(schemas);
1,274,774✔
706
  return code;
1,274,774✔
707
}
708

709
static int32_t createBlockForWalMetaNew(SSDataBlock** pBlock) {
798,948✔
710
  int32_t code = 0;
798,948✔
711
  int32_t lino = 0;
798,948✔
712
  SArray* schemas = NULL;
798,948✔
713

714
  schemas = taosArrayInit(8, sizeof(SSchema));
798,948✔
715
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
802,898!
716

717
  int32_t index = 0;
802,898✔
718
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
802,898!
719
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
802,898!
720
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
802,898!
721
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
802,898!
722

723
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
802,898!
724

725
end:
802,898✔
726
  taosArrayDestroy(schemas);
802,898✔
727
  return code;
802,898✔
728
}
729

730
static int32_t createBlockForDropTable(SSDataBlock** pBlock) {
1,356✔
731
  int32_t code = 0;
1,356✔
732
  int32_t lino = 0;
1,356✔
733
  SArray* schemas = NULL;
1,356✔
734

735
  schemas = taosArrayInit(8, sizeof(SSchema));
1,356✔
736
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,356!
737

738
  int32_t index = 0;
1,356✔
739
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
1,356!
740
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
1,356!
741

742
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
1,356!
743

744
end:
1,356✔
745
  taosArrayDestroy(schemas);
1,356✔
746
  return code;
1,356✔
747
}
748

749
static int32_t processMeta(int16_t msgType, SStreamTriggerReaderInfo* sStreamReaderInfo, void *data, int32_t len, SSTriggerWalNewRsp* rsp, int32_t ver) {
1,297,940✔
750
  int32_t code = 0;
1,297,940✔
751
  int32_t lino = 0;
1,297,940✔
752
  SDecoder dcoder = {0};
1,297,940✔
753
  tDecoderInit(&dcoder, data, len);
1,297,940✔
754
  if (msgType == TDMT_VND_DELETE && sStreamReaderInfo->deleteReCalc != 0) {
1,297,940✔
755
    if (rsp->deleteBlock == NULL) {
143,844✔
756
      STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&rsp->deleteBlock));
47,766!
757
    }
758
      
759
    STREAM_CHECK_RET_GOTO(scanDeleteDataNew(sStreamReaderInfo, rsp, data, len, ver));
143,844!
760
  } else if (msgType == TDMT_VND_DROP_TABLE && sStreamReaderInfo->deleteOutTbl != 0) {
1,154,096✔
761
    if (rsp->dropBlock == NULL) {
1,356!
762
      STREAM_CHECK_RET_GOTO(createBlockForDropTable((SSDataBlock**)&rsp->dropBlock));
1,356!
763
    }
764
    STREAM_CHECK_RET_GOTO(scanDropTableNew(sStreamReaderInfo, rsp, data, len, ver));
1,356!
765
  } else if (msgType == TDMT_VND_DROP_STB) {
1,152,740!
766
    STREAM_CHECK_RET_GOTO(scanDropSTableNew(sStreamReaderInfo, data, len));
×
767
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
1,152,740✔
768
    STREAM_CHECK_RET_GOTO(scanCreateTableNew(sStreamReaderInfo, data, len));
133,165!
769
  } else if (msgType == TDMT_VND_ALTER_STB) {
1,019,575✔
770
    // STREAM_CHECK_RET_GOTO(scanAlterSTableNew(sStreamReaderInfo, data, len));
771
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
797,175✔
772
    STREAM_CHECK_RET_GOTO(scanAlterTableNew(sStreamReaderInfo, data, len));
165,569!
773
  }
774

775
  end:
1,296,604✔
776
  tDecoderClear(&dcoder);
1,296,604✔
777
  return code;
1,296,604✔
778
}
779
static int32_t processWalVerMetaNew(SVnode* pVnode, SSTriggerWalNewRsp* rsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
20,688,852✔
780
                       int64_t ctime) {
781
  int32_t code = 0;
20,688,852✔
782
  int32_t lino = 0;
20,688,852✔
783
  void* pTask = sStreamReaderInfo->pTask;
20,688,852✔
784

785
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
20,690,188✔
786
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
20,682,190!
787
  code = walReaderSeekVer(pWalReader, rsp->ver);
20,682,190✔
788
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
20,674,063✔
789
    if (rsp->ver < walGetFirstVer(pWalReader->pWal)) {
15,699,848!
790
      rsp->ver = walGetFirstVer(pWalReader->pWal);
×
791
    }
792
    ST_TASK_DLOG("vgId:%d %s scan wal error:%s", TD_VID(pVnode), __func__, tstrerror(code));
15,709,295✔
793
    code = TSDB_CODE_SUCCESS;
15,710,629✔
794
    goto end;
15,710,629✔
795
  }
796
  STREAM_CHECK_RET_GOTO(code);
4,974,215!
797

798
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, STREAM_RETURN_ROWS_NUM));
4,974,215!
799
  while (1) {
58,142,127✔
800
    code = walNextValidMsg(pWalReader, true);
63,113,725✔
801
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){\
63,105,517✔
802
      ST_TASK_DLOG("vgId:%d %s scan wal error:%s", TD_VID(pVnode), __func__, tstrerror(code));
4,968,872✔
803
      code = TSDB_CODE_SUCCESS;
4,974,215✔
804
      goto end;
4,974,215✔
805
    }
806
    STREAM_CHECK_RET_GOTO(code);
58,136,645!
807
    rsp->ver = pWalReader->curVersion;
58,136,645✔
808
    SWalCont* wCont = &pWalReader->pHead->head;
58,139,183✔
809
    rsp->verTime = wCont->ingestTs;
58,134,074✔
810
    if (wCont->ingestTs / 1000 > ctime) break;
58,132,577!
811
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
58,135,191✔
812
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
58,137,999✔
813
    int64_t ver = wCont->version;
58,140,605✔
814

815
    ST_TASK_DLOG("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
58,139,262✔
816
      TD_VID(pVnode), ver, wCont->msgType, sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
817
    if (wCont->msgType == TDMT_VND_SUBMIT) {
58,144,647✔
818
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
57,342,315✔
819
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
57,342,315✔
820
      STREAM_CHECK_RET_GOTO(scanSubmitDataForMeta(sStreamReaderInfo, rsp, data, len, ver));
57,340,979!
821
    } else {
822
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, rsp, ver));
800,743!
823
    }
824

825
    if (rsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
58,142,128!
826
      break;
×
827
    }
828
  }
829

830
end:
20,684,844✔
831
  walCloseReader(pWalReader);
20,684,844✔
832
  return code;
20,674,166✔
833
}
834

835
static int32_t processTag(SVnode* pVnode, SStreamTriggerReaderInfo* info, bool isCalc, SStorageAPI* api, 
10,602,075✔
836
  uint64_t uid, SSDataBlock* pBlock, uint32_t currentRow, uint32_t numOfRows, uint32_t numOfBlocks) {
837
  int32_t     code = 0;
10,602,075✔
838
  int32_t     lino = 0;
10,602,075✔
839
  SMetaReader mr = {0};
10,602,075✔
840
  SArray* tagCache = NULL;
10,600,840✔
841

842
  SHashObj* metaCache = isCalc ? info->pTableMetaCacheCalc : info->pTableMetaCacheTrigger;
10,602,784!
843
  SExprInfo*   pExprInfo = isCalc ? info->pExprInfoCalcTag : info->pExprInfoTriggerTag; 
10,598,076!
844
  int32_t      numOfExpr = isCalc ? info->numOfExprCalcTag : info->numOfExprTriggerTag;
10,602,601!
845
  if (numOfExpr == 0) {
10,608,947!
846
    return TSDB_CODE_SUCCESS;
×
847
  }
848

849
  void* uidData = taosHashGet(metaCache, &uid, LONG_BYTES);
10,608,947✔
850
  if (uidData == NULL) {
10,605,458✔
851
    api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
430,510✔
852
    code = api->metaReaderFn.getEntryGetUidCache(&mr, uid);
431,509✔
853
    api->metaReaderFn.readerReleaseLock(&mr);
431,509✔
854
    STREAM_CHECK_RET_GOTO(code);
431,509!
855

856
    tagCache = taosArrayInit(numOfExpr, POINTER_BYTES);
431,509✔
857
    STREAM_CHECK_NULL_GOTO(tagCache, terrno);
431,509!
858
    if(taosHashPut(metaCache, &uid, LONG_BYTES, &tagCache, POINTER_BYTES) != 0) {
431,509!
859
      taosArrayDestroyP(tagCache, taosMemFree);
×
860
      code = terrno;
×
861
      goto end;
×
862
    }
863
  } else {
864
    tagCache = *(SArray**)uidData;
10,174,948✔
865
    STREAM_CHECK_CONDITION_GOTO(taosArrayGetSize(tagCache) != numOfExpr, TSDB_CODE_INVALID_PARA);
10,134,284!
866
  }
867
  
868
  for (int32_t j = 0; j < numOfExpr; ++j) {
28,924,582✔
869
    const SExprInfo* pExpr1 = &pExprInfo[j];
18,304,924✔
870
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
18,310,456✔
871

872
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
18,315,800✔
873
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
18,318,627!
874
    int32_t functionId = pExpr1->pExpr->_function.functionId;
18,318,627✔
875

876
    // this is to handle the tbname
877
    if (fmIsScanPseudoColumnFunc(functionId)) {
18,316,855✔
878
      int32_t fType = pExpr1->pExpr->_function.functionType;
10,612,954✔
879
      if (fType == FUNCTION_TYPE_TBNAME) {
10,611,710✔
880
        char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
10,610,371✔
881
        if (uidData == NULL) {
10,608,946✔
882
          STR_TO_VARSTR(buf, mr.me.name)
431,509!
883
          char* tbname = taosStrdup(mr.me.name);
431,509!
884
          STREAM_CHECK_NULL_GOTO(tbname, terrno);
431,509!
885
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &tbname), terrno);
863,018!
886
        } else {
887
          char* tbname = taosArrayGetP(tagCache, j);
10,177,437✔
888
          STR_TO_VARSTR(buf, tbname)
10,177,341!
889
        }
890
        for (uint32_t i = 0; i < numOfRows; i++){
30,316,518✔
891
          colDataClearNull_f(pColInfoData->nullbitmap, currentRow + i);
19,705,812!
892
        }
893
        code = colDataSetNItems(pColInfoData, currentRow, buf, numOfRows, numOfBlocks, false);
10,610,706✔
894
        pColInfoData->info.colId = -1;
10,610,484✔
895
      }
896
    } else {  // these are tags
897
      char* data = NULL;
7,712,020✔
898
      const char* p = NULL;
7,709,348✔
899
      STagVal tagVal = {0};
7,709,348✔
900
      if (uidData == NULL) {
7,709,348✔
901
        tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
346,363✔
902
        p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
346,363✔
903

904
        if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
346,363!
905
          data = tTagValToData((const STagVal*)p, false);
346,363✔
906
        } else {
907
          data = (char*)p;
×
908
        }
909

910
        if (data == NULL) {
346,363!
911
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &data), terrno);
×
912
        } else {
913
          int32_t len = pColInfoData->info.bytes;
346,363✔
914
          if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
346,363!
915
            len = calcStrBytesByType(pColInfoData->info.type, (char*)data);
108,404✔
916
          }
917
          char* pData = taosMemoryCalloc(1, len);
346,363!
918
          STREAM_CHECK_NULL_GOTO(pData, terrno);
346,363!
919
          (void)memcpy(pData, data, len);
346,363!
920
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &pData), terrno);
692,726!
921
        }
922
      } else {
923
        data = taosArrayGetP(tagCache, j);
7,362,985✔
924
      }
925

926
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
7,710,684!
927
      if (isNullVal) {
7,713,356!
928
        colDataSetNNULL(pColInfoData, currentRow, numOfRows);
×
929
      } else {
930
        for (uint32_t i = 0; i < numOfRows; i++){
50,352,301✔
931
          colDataClearNull_f(pColInfoData->nullbitmap, currentRow + i);
42,638,945!
932
        }
933
        code = colDataSetNItems(pColInfoData, currentRow, data, numOfRows, numOfBlocks, false);
7,713,356✔
934
        if (uidData == NULL && pColInfoData->info.type != TSDB_DATA_TYPE_JSON && IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
7,708,012!
935
          taosMemoryFree(data);
108,404!
936
        }
937
        STREAM_CHECK_RET_GOTO(code);
7,708,012!
938
      }
939
    }
940
  }
941

942
end:
10,621,258✔
943
  api->metaReaderFn.clearReader(&mr);
10,610,486✔
944
  return code;
10,603,598✔
945
}
946

947
int32_t getRowRange(SColData* pCol, STimeWindow* window, int32_t* rowStart, int32_t* rowEnd, int32_t* nRows) {
×
948
  int32_t code = 0;
×
949
  int32_t lino = 0;
×
950
  *nRows = 0;
×
951
  *rowStart = 0;
×
952
  *rowEnd = pCol->nVal;
×
953
  if (window != NULL) {
×
954
    SColVal colVal = {0};
×
955
    *rowStart = -1;
×
956
    *rowEnd = -1;
×
957
    for (int32_t k = 0; k < pCol->nVal; k++) {
×
958
      STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
959
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
960
      if (ts >= window->skey && *rowStart == -1) {
×
961
        *rowStart = k;
×
962
      }
963
      if (ts > window->ekey && *rowEnd == -1) {
×
964
        *rowEnd = k;
×
965
      }
966
    }
967
    STREAM_CHECK_CONDITION_GOTO(*rowStart == -1 || *rowStart == *rowEnd, TDB_CODE_SUCCESS);
×
968

969
    if (*rowStart != -1 && *rowEnd == -1) {
×
970
      *rowEnd = pCol->nVal;
×
971
    }
972
  }
973
  *nRows = *rowEnd - *rowStart;
×
974

975
end:
×
976
  return code;
×
977
}
978

979
static int32_t setColData(int64_t rows, int32_t rowStart, int32_t rowEnd, SColData* colData, SColumnInfoData* pColData) {
×
980
  int32_t code = 0;
×
981
  int32_t lino = 0;
×
982
  for (int32_t k = rowStart; k < rowEnd; k++) {
×
983
    SColVal colVal = {0};
×
984
    STREAM_CHECK_RET_GOTO(tColDataGetValue(colData, k, &colVal));
×
985
    STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, rows + k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
986
                                        !COL_VAL_IS_VALUE(&colVal)));
987
  }
988
  end:
×
989
  return code;
×
990
}
991

992
static int32_t scanSubmitTbData(SVnode* pVnode, SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, 
27,431,245✔
993
  STSchema** schemas, SSHashObj* ranges, SSHashObj* gidHash, SSTriggerWalNewRsp* rsp, int64_t ver) {
994
  int32_t code = 0;
27,431,245✔
995
  int32_t lino = 0;
27,431,245✔
996
  uint64_t id = 0;
27,431,245✔
997
  WalMetaResult walMeta = {0};
27,441,948✔
998
  void* pTask = sStreamReaderInfo->pTask;
27,461,995✔
999
  SSDataBlock * pBlock = (SSDataBlock*)rsp->dataBlock;
27,466,014✔
1000

1001
  if (tStartDecode(pCoder) < 0) {
27,460,669!
1002
    code = TSDB_CODE_INVALID_MSG;
×
1003
    TSDB_CHECK_CODE(code, lino, end);
×
1004
  }
1005

1006
  SSubmitTbData submitTbData = {0};
27,476,643✔
1007
  uint8_t       version = 0;
27,464,610✔
1008
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
27,468,716!
1009
    code = TSDB_CODE_INVALID_MSG;
×
1010
    TSDB_CHECK_CODE(code, lino, end);
×
1011
  }
1012
  version = (submitTbData.flags >> 8) & 0xff;
27,468,716✔
1013
  submitTbData.flags = submitTbData.flags & 0xff;
27,468,716✔
1014
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1015
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
27,468,716✔
1016
    if (tStartDecode(pCoder) < 0) {
90,084!
1017
      code = TSDB_CODE_INVALID_MSG;
×
1018
      TSDB_CHECK_CODE(code, lino, end);
×
1019
    }
1020
    tEndDecode(pCoder);
90,084✔
1021
  }
1022

1023
  // submit data
1024
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
27,451,221!
1025
    code = TSDB_CODE_INVALID_MSG;
×
1026
    TSDB_CHECK_CODE(code, lino, end);
×
1027
  }
1028
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
27,471,303!
1029
    code = TSDB_CODE_INVALID_MSG;
×
1030
    TSDB_CHECK_CODE(code, lino, end);
×
1031
  }
1032

1033
  STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &id, rsp->isCalc), TDB_CODE_SUCCESS);
27,471,303✔
1034

1035
  walMeta.id = id;
23,054,503✔
1036
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
23,054,503✔
1037

1038
  if (ranges != NULL){
23,061,159✔
1039
    void* timerange = tSimpleHashGet(ranges, &id, sizeof(id));
17,569,551✔
1040
    if (timerange == NULL) goto end;;
17,570,889!
1041
    int64_t* pRange = (int64_t*)timerange;
17,570,889✔
1042
    window.skey = pRange[0];
17,570,889✔
1043
    window.ekey = pRange[1];
17,572,225✔
1044
  }
1045
  
1046
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
23,045,197!
1047
    code = TSDB_CODE_INVALID_MSG;
×
1048
    TSDB_CHECK_CODE(code, lino, end);
×
1049
  }
1050

1051
  if (*schemas == NULL) {
23,045,197✔
1052
    *schemas = metaGetTbTSchema(pVnode->pMeta, submitTbData.suid != 0 ? submitTbData.suid : submitTbData.uid, submitTbData.sver, 1);
23,049,144✔
1053
    STREAM_CHECK_NULL_GOTO(*schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
23,054,450!
1054
  }
1055

1056
  SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(sStreamReaderInfo->indexHash, &submitTbData.uid, LONG_BYTES);
23,057,031✔
1057
  STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
23,065,168!
1058
  int32_t blockStart = pSlice->currentRowIdx;
23,065,168✔
1059

1060
  int32_t numOfRows = 0;
23,063,829✔
1061
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
23,053,201!
1062
    uint64_t nColData = 0;
×
1063
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1064
      code = TSDB_CODE_INVALID_MSG;
×
1065
      TSDB_CHECK_CODE(code, lino, end);
×
1066
    }
1067

1068
    SColData colData = {0};
×
1069
    code = tDecodeColData(version, pCoder, &colData, false);
×
1070
    if (code) {
×
1071
      code = TSDB_CODE_INVALID_MSG;
×
1072
      TSDB_CHECK_CODE(code, lino, end);
×
1073
    }
1074

1075
    if (colData.flag != HAS_VALUE) {
×
1076
      code = TSDB_CODE_INVALID_MSG;
×
1077
      TSDB_CHECK_CODE(code, lino, end);
×
1078
    }
1079
    
1080
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
1081
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
1082

1083
    int32_t rowStart = 0;
×
1084
    int32_t rowEnd = 0;
×
1085
    STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, &numOfRows));
×
1086
    STREAM_CHECK_CONDITION_GOTO(numOfRows <= 0, TDB_CODE_SUCCESS);
×
1087

1088
    int32_t pos = pCoder->pos;
×
1089
    for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
1090
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
1091
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
1092
      if (pColData->info.colId <= -1) {
×
1093
        pColData->hasNull = true;
×
1094
        continue;
×
1095
      }
1096
      if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
1097
        STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colData, pColData));
×
1098
        continue;
×
1099
      }
1100

1101
      pCoder->pos = pos;
×
1102

1103
      int16_t colId = 0;
×
1104
      if (sStreamReaderInfo->isVtableStream){
×
1105
        int64_t id[2] = {submitTbData.suid, submitTbData.uid};
×
1106
        void *px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
×
1107
        STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
×
1108
        SSHashObj* uInfo = *(SSHashObj **)px;
×
1109
        STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
×
1110
        int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
×
1111
        if (tmp != NULL) {
×
1112
          colId = *tmp;
×
1113
        } else {
1114
          colId = -1;
×
1115
        }
1116
      } else {
1117
        colId = pColData->info.colId;
×
1118
      }
1119
      
1120
      uint64_t j = 1;
×
1121
      for (; j < nColData; j++) {
×
1122
        int16_t cid = 0;
×
1123
        int32_t posTmp = pCoder->pos;
×
1124
        pCoder->pos += INT_BYTES;
×
1125
        if ((code = tDecodeI16v(pCoder, &cid))) return code;
×
1126
        pCoder->pos = posTmp;
×
1127
        if (cid == colId) {
×
1128
          SColData colDataTmp = {0};
×
1129
          code = tDecodeColData(version, pCoder, &colDataTmp, false);
×
1130
          if (code) {
×
1131
            code = TSDB_CODE_INVALID_MSG;
×
1132
            TSDB_CHECK_CODE(code, lino, end);
×
1133
          }
1134
          STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colDataTmp, pColData));
×
1135
          break;
×
1136
        }
1137
        code = tDecodeColData(version, pCoder, &colData, true);
×
1138
        if (code) {
×
1139
          code = TSDB_CODE_INVALID_MSG;
×
1140
          TSDB_CHECK_CODE(code, lino, end);
×
1141
        }
1142
      }
1143
      if (j == nColData) {
×
1144
        colDataSetNNULL(pColData, blockStart, numOfRows);
×
1145
      }
1146
    }
1147
  } else {
1148
    uint64_t nRow = 0;
23,053,201✔
1149
    if (tDecodeU64v(pCoder, &nRow) < 0) {
23,053,041!
1150
      code = TSDB_CODE_INVALID_MSG;
×
1151
      TSDB_CHECK_CODE(code, lino, end);
×
1152
    }
1153
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
52,792,849✔
1154
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
29,706,120✔
1155
      pCoder->pos += pRow->len;
29,720,818✔
1156

1157
      if (iRow == 0){
29,738,471✔
1158
#ifndef NO_UNALIGNED_ACCESS
1159
        walMeta.skey = pRow->ts;
23,065,166✔
1160
#else
1161
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
1162
#endif
1163
      }
1164
      if (iRow == nRow - 1) {
29,722,155✔
1165
#ifndef NO_UNALIGNED_ACCESS
1166
        walMeta.ekey = pRow->ts;
23,055,621✔
1167
#else
1168
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
1169
#endif
1170
      }
1171

1172
      if (pRow->ts < window.skey || pRow->ts > window.ekey) {
29,724,935✔
1173
        continue;
45,392✔
1174
      }
1175
     
1176
      for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
200,142,117✔
1177
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
170,455,932✔
1178
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
170,437,201!
1179
        if (pColData->info.colId <= -1) {
170,437,201✔
1180
          pColData->hasNull = true;
54,905,887✔
1181
          continue;
54,906,073✔
1182
        }
1183
        int16_t colId = 0;
115,584,294✔
1184
        if (sStreamReaderInfo->isVtableStream){
115,584,294!
1185
          int64_t id[2] = {submitTbData.suid, submitTbData.uid};
37,568,837✔
1186
          void* px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
37,568,837!
1187
          STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
37,574,049!
1188
          SSHashObj* uInfo = *(SSHashObj**)px;
37,574,049✔
1189
          STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
37,574,049!
1190
          int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
37,574,049✔
1191
          if (tmp != NULL) {
37,566,164✔
1192
            colId = *tmp;
34,575,170✔
1193
          } else {
1194
            colId = -1;
2,990,994✔
1195
          }
1196
          ST_TASK_TLOG("%s vtable colId:%d, i:%d, uid:%" PRId64, __func__, colId, i, submitTbData.uid);
37,572,848!
1197
        } else {
1198
          colId = pColData->info.colId;
78,014,188✔
1199
        }
1200
        
1201
        SColVal colVal = {0};
115,577,680✔
1202
        int32_t sourceIdx = 0;
115,575,075✔
1203
        while (1) {
1204
          if (sourceIdx >= (*schemas)->numOfCols) {
307,039,903✔
1205
            break;
43,241,035✔
1206
          }
1207
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, *schemas, sourceIdx, &colVal));
263,794,113!
1208
          if (colVal.cid == colId) {
263,819,752✔
1209
            break;
72,354,924✔
1210
          }
1211
          sourceIdx++;
191,464,828✔
1212
        }
1213
        if (colVal.cid == colId && COL_VAL_IS_VALUE(&colVal)) {
115,595,959✔
1214
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
68,638,741!
1215
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, blockStart+ numOfRows, (const char*)colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
106,540!
1216
          } else {
1217
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, blockStart + numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
68,532,206!
1218
          }
1219
        } else {
1220
          colDataSetNULL(pColData, blockStart + numOfRows);
46,957,218!
1221
        }
1222
      }
1223
      
1224
      numOfRows++;
29,694,416✔
1225
    }
1226
  }
1227

1228
  if (numOfRows > 0) {
23,062,493!
1229
    if (!sStreamReaderInfo->isVtableStream) {
23,062,493✔
1230
      SStorageAPI  api = {0};
10,362,347✔
1231
      initStorageAPI(&api);
10,357,965✔
1232
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo, rsp->isCalc, &api, submitTbData.uid, pBlock, blockStart, numOfRows, 1));
10,360,918!
1233
    }
1234
    
1235
    SColumnInfoData* pColData = taosArrayGetLast(pBlock->pDataBlock);
23,058,386✔
1236
    STREAM_CHECK_NULL_GOTO(pColData, terrno);
23,060,879!
1237
    STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, blockStart, (const char*)&ver, numOfRows, 1, false));
23,060,879!
1238
  }
1239

1240
  ST_TASK_DLOG("%s process submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
23,066,505✔
1241
    ", uid:%" PRId64 ", ver:%d, row index:%d, rows:%d", __func__, window.skey, window.ekey, 
1242
    id, submitTbData.uid, submitTbData.sver, pSlice->currentRowIdx, numOfRows);
1243
  pSlice->currentRowIdx += numOfRows;
23,065,236✔
1244
  pBlock->info.rows += numOfRows;
23,058,482✔
1245
  
1246
  if (gidHash == NULL) goto end;
23,063,832✔
1247

1248
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
5,492,944✔
1249
  if (data != NULL) {
5,491,605!
1250
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
×
1251
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
×
1252
  } else {
1253
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
5,491,605!
1254
  }
1255

1256
end:
27,458,587✔
1257
  if (code != 0) {                                                             \
27,459,240!
1258
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); \
×
1259
  }
1260
  tEndDecode(pCoder);
27,459,240✔
1261
  return code;
27,459,232✔
1262
}
1263
static int32_t scanSubmitData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo,
27,445,879✔
1264
  void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp, int64_t ver) {
1265
  int32_t  code = 0;
27,445,879✔
1266
  int32_t  lino = 0;
27,445,879✔
1267
  STSchema* schemas = NULL;
27,445,879✔
1268
  SDecoder decoder = {0};
27,453,903✔
1269
  SSHashObj* gidHash = NULL;
27,467,273✔
1270
  void* pTask = sStreamReaderInfo->pTask;
27,467,273✔
1271

1272
  tDecoderInit(&decoder, data, len);
27,468,614✔
1273
  if (tStartDecode(&decoder) < 0) {
27,448,562!
1274
    code = TSDB_CODE_INVALID_MSG;
×
1275
    TSDB_CHECK_CODE(code, lino, end);
×
1276
  }
1277

1278
  uint64_t nSubmitTbData = 0;
27,461,936✔
1279
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
27,436,596!
1280
    code = TSDB_CODE_INVALID_MSG;
×
1281
    TSDB_CHECK_CODE(code, lino, end);
×
1282
  }
1283

1284
  if (rsp->metaBlock != NULL){
27,436,596✔
1285
    gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
9,888,361✔
1286
    STREAM_CHECK_NULL_GOTO(gidHash, terrno);
9,870,976!
1287
  }
1288

1289
  for (int32_t i = 0; i < nSubmitTbData; i++) {
54,887,901✔
1290
    STREAM_CHECK_RET_GOTO(scanSubmitTbData(pVnode, &decoder, sStreamReaderInfo, &schemas, ranges, gidHash, rsp, ver));
27,444,612!
1291
  }
1292

1293
  tEndDecode(&decoder);
27,443,289✔
1294

1295
  if (rsp->metaBlock != NULL){
27,469,984✔
1296
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
9,881,681!
1297
    int32_t iter = 0;
9,900,416✔
1298
    void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
9,900,416✔
1299
    while (px != NULL) {
15,385,334✔
1300
      WalMetaResult* pMeta = (WalMetaResult*)px;
5,490,266✔
1301
      STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
5,490,266!
1302
      ((SSDataBlock*)rsp->metaBlock)->info.rows++;
5,490,269✔
1303
      ST_TASK_DLOG("%s process meta data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
5,492,944✔
1304
            ", ver:%"PRId64, __func__, pMeta->skey, pMeta->ekey, pMeta->id, ver);
1305
      px = tSimpleHashIterate(gidHash, px, &iter);
5,492,944✔
1306
    }
1307
  }
1308
  
1309

1310
end:
27,451,885✔
1311
  taosMemoryFree(schemas);
27,473,985!
1312
  tSimpleHashCleanup(gidHash);
27,463,263✔
1313
  tDecoderClear(&decoder);
27,449,900✔
1314
  return code;
27,460,592✔
1315
}
1316

1317
static int32_t scanSubmitTbDataPre(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* ranges, 
31,412,480✔
1318
  uint64_t* gid, int64_t* uid, int32_t* numOfRows, bool isCalc) {
1319
  int32_t code = 0;
31,412,480✔
1320
  int32_t lino = 0;
31,412,480✔
1321
  void* pTask = sStreamReaderInfo->pTask;
31,412,480✔
1322

1323
  if (tStartDecode(pCoder) < 0) {
31,417,817!
1324
    code = TSDB_CODE_INVALID_MSG;
×
1325
    TSDB_CHECK_CODE(code, lino, end);
×
1326
  }
1327

1328
  SSubmitTbData submitTbData = {0};
31,440,539✔
1329
  uint8_t       version = 0;
31,427,174✔
1330
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
31,436,528!
1331
    code = TSDB_CODE_INVALID_MSG;
×
1332
    TSDB_CHECK_CODE(code, lino, end);
×
1333
  }
1334
  version = (submitTbData.flags >> 8) & 0xff;
31,436,528✔
1335
  submitTbData.flags = submitTbData.flags & 0xff;
31,436,528✔
1336

1337
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1338
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
31,436,528✔
1339
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
1,565,384!
1340
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
1,564,047!
1341
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
1,564,047!
1342
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq));
1,565,382!
1343
  }
1344

1345
  // submit data
1346
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
31,429,852!
1347
    code = TSDB_CODE_INVALID_MSG;
×
1348
    TSDB_CHECK_CODE(code, lino, end);
×
1349
  }
1350
  if (tDecodeI64(pCoder, uid) < 0) {
31,427,175!
1351
    code = TSDB_CODE_INVALID_MSG;
×
1352
    TSDB_CHECK_CODE(code, lino, end);
×
1353
  }
1354

1355
  STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, submitTbData.suid, *uid, gid, isCalc), TDB_CODE_SUCCESS);
31,427,175✔
1356

1357
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
23,065,169✔
1358

1359
  if (ranges != NULL){
23,065,169✔
1360
    void* timerange = tSimpleHashGet(ranges, gid, sizeof(*gid));
17,573,561✔
1361
    if (timerange == NULL) goto end;;
17,573,561!
1362
    int64_t* pRange = (int64_t*)timerange;
17,573,561✔
1363
    window.skey = pRange[0];
17,573,561✔
1364
    window.ekey = pRange[1];
17,573,561✔
1365
  }
1366
  
1367
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
23,065,166!
1368
    code = TSDB_CODE_INVALID_MSG;
×
1369
    TSDB_CHECK_CODE(code, lino, end);
×
1370
  }
1371

1372
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
23,065,166!
1373
    uint64_t nColData = 0;
×
1374
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1375
      code = TSDB_CODE_INVALID_MSG;
×
1376
      TSDB_CHECK_CODE(code, lino, end);
×
1377
    }
1378

1379
    SColData colData = {0};
×
1380
    code = tDecodeColData(version, pCoder, &colData, false);
×
1381
    if (code) {
×
1382
      code = TSDB_CODE_INVALID_MSG;
×
1383
      TSDB_CHECK_CODE(code, lino, end);
×
1384
    }
1385

1386
    if (colData.flag != HAS_VALUE) {
×
1387
      code = TSDB_CODE_INVALID_MSG;
×
1388
      TSDB_CHECK_CODE(code, lino, end);
×
1389
    }
1390
    int32_t rowStart = 0;
×
1391
    int32_t rowEnd = 0;
×
1392
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) {
×
1393
      STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, numOfRows));
×
1394
    } else {
1395
      (*numOfRows) = colData.nVal;
×
1396
    } 
1397
  } else {
1398
    uint64_t nRow = 0;
23,065,166✔
1399
    if (tDecodeU64v(pCoder, &nRow) < 0) {
23,066,505!
1400
      code = TSDB_CODE_INVALID_MSG;
×
1401
      TSDB_CHECK_CODE(code, lino, end);
×
1402
    }
1403

1404
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) { 
23,066,505!
1405
      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
41,752,095✔
1406
        SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
24,177,265✔
1407
        pCoder->pos += pRow->len;
24,177,265✔
1408
        if (pRow->ts < window.skey || pRow->ts > window.ekey) {
24,178,534!
1409
          continue;
45,392✔
1410
        }
1411
        (*numOfRows)++;
24,133,142✔
1412
      }
1413
    } else {
1414
      (*numOfRows) = nRow;
5,492,944✔
1415
    }
1416
  }
1417
  
1418
end:
31,445,885✔
1419
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
31,440,531✔
1420
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
31,435,249!
1421
  tEndDecode(pCoder);
31,436,586✔
1422
  return code;
31,435,188✔
1423
}
1424

1425
static int32_t scanSubmitDataPre(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
31,424,572✔
1426
  int32_t  code = 0;
31,424,572✔
1427
  int32_t  lino = 0;
31,424,572✔
1428
  SDecoder decoder = {0};
31,424,572✔
1429
  void* pTask = sStreamReaderInfo->pTask;
31,427,244✔
1430

1431
  tDecoderInit(&decoder, data, len);
31,429,922✔
1432
  if (tStartDecode(&decoder) < 0) {
31,427,182!
1433
    code = TSDB_CODE_INVALID_MSG;
×
1434
    TSDB_CHECK_CODE(code, lino, end);
×
1435
  }
1436

1437
  uint64_t nSubmitTbData = 0;
31,431,171✔
1438
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
31,437,869!
1439
    code = TSDB_CODE_INVALID_MSG;
×
1440
    TSDB_CHECK_CODE(code, lino, end);
×
1441
  }
1442

1443
  for (int32_t i = 0; i < nSubmitTbData; i++) {
62,877,070✔
1444
    uint64_t gid = -1;
31,433,855✔
1445
    int64_t  uid = 0;
31,433,854✔
1446
    int32_t numOfRows = 0;
31,431,181✔
1447
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataPre(&decoder, sStreamReaderInfo, ranges, &gid, &uid, &numOfRows, rsp->isCalc));
31,432,517!
1448
    if (numOfRows <= 0) {
31,439,196✔
1449
      continue;
8,379,384✔
1450
    }
1451
    rsp->totalRows += numOfRows;
23,059,812✔
1452

1453
    SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(sStreamReaderInfo->indexHash, &uid, LONG_BYTES);
23,066,505✔
1454
    if (pSlice != NULL) {
23,063,850✔
1455
      pSlice->numRows += numOfRows;
22,334,759✔
1456
      ST_TASK_DLOG("%s again uid:%" PRId64 ", gid:%" PRIu64 ", total numOfRows:%d", __func__, uid, gid, pSlice->numRows);
22,337,414✔
1457
      pSlice->gId = gid;
22,337,414✔
1458
    } else {
1459
      SStreamWalDataSlice tmp = {.gId=gid,.numRows=numOfRows,.currentRowIdx=0,.startRowIdx=0};
729,091✔
1460
      ST_TASK_DLOG("%s first uid:%" PRId64 ", gid:%" PRIu64 ", numOfRows:%d", __func__, uid, gid, tmp.numRows);
729,091✔
1461
      STREAM_CHECK_RET_GOTO(tSimpleHashPut(sStreamReaderInfo->indexHash, &uid, LONG_BYTES, &tmp, sizeof(tmp)));
729,091!
1462
    } 
1463
  }
1464

1465
  tEndDecode(&decoder);
31,443,215✔
1466

1467
end:
31,435,193✔
1468
  tDecoderClear(&decoder);
31,440,537✔
1469
  return code;
31,441,880✔
1470
}
1471

1472
static void resetIndexHash(SSHashObj* indexHash){
21,930,293✔
1473
  void*   pe = NULL;
21,930,293✔
1474
  int32_t iter = 0;
21,930,293✔
1475
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
51,296,921✔
1476
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
29,371,991✔
1477
    pInfo->startRowIdx = 0;
29,371,991✔
1478
    pInfo->currentRowIdx = 0;
29,370,654✔
1479
    pInfo->numRows = 0;
29,362,621✔
1480
    pInfo->gId = -1;
29,363,960✔
1481
  }
1482
}
21,937,057✔
1483

1484
static void buildIndexHash(SSHashObj* indexHash, void* pTask){
1,775,963✔
1485
  void*   pe = NULL;
1,775,963✔
1486
  int32_t iter = 0;
1,775,963✔
1487
  int32_t index = 0;
1,775,963✔
1488
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
6,544,470✔
1489
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
4,768,507✔
1490
    pInfo->startRowIdx = index;
4,768,507✔
1491
    pInfo->currentRowIdx = index;
4,768,507✔
1492
    index += pInfo->numRows;
4,768,507✔
1493
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
7,081,484!
1494
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1495
  }
1496
}
1,775,963✔
1497

1498
static void printIndexHash(SSHashObj* indexHash, void* pTask){
1,773,279✔
1499
  void*   pe = NULL;
1,773,279✔
1500
  int32_t iter = 0;
1,773,279✔
1501
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
6,536,424✔
1502
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
4,763,145✔
1503
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
7,076,122!
1504
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1505
  }
1506
}
1,771,940✔
1507

1508
static void filterIndexHash(SSHashObj* indexHash, SColumnInfoData* pRet){
44,769✔
1509
  void*   pe = NULL;
44,769✔
1510
  int32_t iter = 0;
44,769✔
1511
  int32_t index = 0;
44,769✔
1512
  int32_t pIndex = 0;
44,769✔
1513
  int8_t* pIndicator = (int8_t*)pRet->pData;
44,769✔
1514
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
126,903✔
1515
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
82,134✔
1516
    pInfo->startRowIdx = index;
82,134✔
1517
    int32_t size = pInfo->numRows;
82,134✔
1518
    for (int32_t i = 0; i < pInfo->numRows; i++) {
676,857✔
1519
      if (pIndicator && !pIndicator[pIndex++]) {
594,723!
1520
        size--;
212,261✔
1521
      }
1522
    }
1523
    pInfo->numRows = size;
82,134✔
1524
    index += pInfo->numRows;
82,134✔
1525
    stTrace("stream reader re build index hash uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
82,134!
1526
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1527
  }
1528
}
44,769✔
1529

1530
static int32_t prepareIndexMetaData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* resultRsp){
9,866,468✔
1531
  int32_t      code = 0;
9,866,468✔
1532
  int32_t      lino = 0;
9,866,468✔
1533
  void* pTask = sStreamReaderInfo->pTask;
9,866,468✔
1534

1535
  code = walReaderSeekVer(pWalReader, resultRsp->ver);
9,869,149✔
1536
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
9,871,814✔
1537
    if (resultRsp->ver < walGetFirstVer(pWalReader->pWal)) {
8,348,888!
1538
      resultRsp->ver = walGetFirstVer(pWalReader->pWal);
×
1539
    }
1540
    ST_TASK_DLOG("%s scan wal error:%s",  __func__, tstrerror(code));
8,343,640✔
1541
    code = TSDB_CODE_SUCCESS;
8,344,964✔
1542
    goto end;
8,344,964✔
1543
  }
1544
  STREAM_CHECK_RET_GOTO(code);
1,522,926!
1545

1546
  while (1) {
14,368,188✔
1547
    code = walNextValidMsg(pWalReader, true);
15,891,114✔
1548
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
15,888,714✔
1549
      ST_TASK_DLOG("%s scan wal error:%s", __func__, tstrerror(code));
1,518,021✔
1550
      code = TSDB_CODE_SUCCESS;
1,521,663✔
1551
      goto end;
1,521,663✔
1552
    }
1553
    STREAM_CHECK_RET_GOTO(code);
14,370,693!
1554
    resultRsp->ver = pWalReader->curVersion;
14,370,693✔
1555
    SWalCont* wCont = &pWalReader->pHead->head;
14,369,444✔
1556
    resultRsp->verTime = wCont->ingestTs;
14,366,782✔
1557
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
14,374,798✔
1558
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
14,361,362✔
1559
    int64_t ver = wCont->version;
14,366,780✔
1560
    ST_TASK_DLOG("%s scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d", __func__,
14,369,473✔
1561
      ver, wCont->msgType, sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
1562
    if (wCont->msgType == TDMT_VND_SUBMIT) {
14,374,928✔
1563
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
13,865,646✔
1564
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
13,872,329✔
1565
      STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, data, len, NULL, resultRsp));
13,868,321!
1566
    } else if (wCont->msgType == TDMT_VND_ALTER_TABLE && resultRsp->totalRows > 0) {
502,469✔
1567
      resultRsp->ver--;
5,272✔
1568
      break;
6,609✔
1569
    } else {
1570
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, resultRsp, ver));
497,197!
1571
    }
1572

1573
    ST_TASK_DLOG("%s scan wal next ver:%" PRId64 ", totalRows:%d", __func__, resultRsp->ver, resultRsp->totalRows);
14,358,837✔
1574
    if (resultRsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
14,358,837!
1575
      break;
×
1576
    }
1577
  }
1578
  
1579
end:
9,873,236✔
1580
  STREAM_PRINT_LOG_END(code, lino);
9,873,236!
1581
  return code;
9,869,224✔
1582
}
1583

1584
static int32_t prepareIndexData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, 
12,059,803✔
1585
  SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp){
1586
  int32_t      code = 0;
12,059,803✔
1587
  int32_t      lino = 0;
12,059,803✔
1588

1589
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
29,633,364✔
1590
    int64_t *ver = taosArrayGet(versions, i);
17,572,224✔
1591
    if (ver == NULL) continue;
17,572,224!
1592

1593
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
17,572,224!
1594
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
17,573,561!
1595
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
1596
      continue;
×
1597
    }
1598
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
17,573,561!
1599

1600
    SWalCont* wCont = &pWalReader->pHead->head;
17,572,222✔
1601
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
17,572,222✔
1602
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
17,572,222✔
1603

1604
    STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, pBody, bodyLen, ranges, rsp));
17,572,222!
1605
  }
1606
  
1607
end:
12,059,803✔
1608
  return code;
12,059,803✔
1609
}
1610

1611
static int32_t filterData(SSTriggerWalNewRsp* resultRsp, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,775,963✔
1612
  int32_t      code = 0;
1,775,963✔
1613
  int32_t       lino = 0;
1,775,963✔
1614
  SColumnInfoData* pRet = NULL;
1,775,963✔
1615
  int64_t totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
1,775,963✔
1616
  STREAM_CHECK_RET_GOTO(qStreamFilter(((SSDataBlock*)resultRsp->dataBlock), sStreamReaderInfo->pFilterInfo, &pRet));
1,775,963!
1617
  if (((SSDataBlock*)resultRsp->dataBlock)->info.rows < totalRows) {
1,775,963✔
1618
    filterIndexHash(sStreamReaderInfo->indexHash, pRet);
44,769✔
1619
  }
1620

1621
end:
1,775,963✔
1622
  colDataDestroy(pRet);
1,775,963✔
1623
  taosMemoryFree(pRet);
1,774,624!
1624
  return code;
1,775,963✔
1625
}
1626

1627
static int32_t processWalVerMetaDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
9,869,220✔
1628
                                    SSTriggerWalNewRsp* resultRsp) {
1629
  int32_t      code = 0;
9,869,220✔
1630
  int32_t      lino = 0;
9,869,220✔
1631
  void* pTask = sStreamReaderInfo->pTask;
9,869,220✔
1632
                                        
1633
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
9,871,899✔
1634
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
9,870,555!
1635
  resetIndexHash(sStreamReaderInfo->indexHash);
9,870,555✔
1636
  blockDataEmpty(resultRsp->dataBlock);
9,878,590✔
1637
  blockDataEmpty(resultRsp->metaBlock);
9,871,718✔
1638
  int64_t lastVer = resultRsp->ver;                                      
9,862,370✔
1639
  STREAM_CHECK_RET_GOTO(prepareIndexMetaData(pWalReader, sStreamReaderInfo, resultRsp));
9,862,463!
1640
  STREAM_CHECK_CONDITION_GOTO(resultRsp->totalRows == 0, TDB_CODE_SUCCESS);
9,865,209✔
1641

1642
  buildIndexHash(sStreamReaderInfo->indexHash, pTask);
429,920✔
1643
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(((SSDataBlock*)resultRsp->dataBlock), resultRsp->totalRows));
429,920!
1644
  while(lastVer < resultRsp->ver) {
10,666,639✔
1645
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, lastVer++));
10,255,473!
1646
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
10,252,800✔
1647
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
369,782!
1648
      continue;
369,782✔
1649
    }
1650
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
9,893,719!
1651
    SWalCont* wCont = &pWalReader->pHead->head;
9,877,651✔
1652
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
9,892,364✔
1653
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
9,897,728✔
1654

1655
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, NULL, resultRsp, wCont->version));
9,900,393!
1656
  }
1657

1658
  int32_t metaRows = resultRsp->totalRows - ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
429,920✔
1659
  STREAM_CHECK_RET_GOTO(filterData(resultRsp, sStreamReaderInfo));
429,920!
1660
  resultRsp->totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows + metaRows;
429,920✔
1661

1662
end:
9,866,542✔
1663
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
9,866,542✔
1664
          resultRsp->totalRows, resultRsp->ver, walGetAppliedVer(pWalReader->pWal));
1665
  walCloseReader(pWalReader);
9,867,860✔
1666
  return code;
9,869,129✔
1667
}
1668

1669
static int32_t processWalVerDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
12,059,803✔
1670
                                    SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
1671
  int32_t      code = 0;
12,059,803✔
1672
  int32_t      lino = 0;
12,059,803✔
1673

1674
  void* pTask = sStreamReaderInfo->pTask;
12,059,803✔
1675
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
12,061,139✔
1676
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
12,062,408!
1677
  
1678
  if (taosArrayGetSize(versions) > 0) {
12,062,408✔
1679
    rsp->ver = *(int64_t*)taosArrayGetLast(versions);
1,346,043✔
1680
  }
1681
  
1682
  resetIndexHash(sStreamReaderInfo->indexHash);
12,062,408✔
1683
  STREAM_CHECK_RET_GOTO(prepareIndexData(pWalReader, sStreamReaderInfo, versions, ranges, rsp));
12,059,803!
1684
  STREAM_CHECK_CONDITION_GOTO(rsp->totalRows == 0, TDB_CODE_SUCCESS);
12,058,467✔
1685

1686
  buildIndexHash(sStreamReaderInfo->indexHash, pTask);
1,346,043✔
1687

1688
  blockDataEmpty(rsp->dataBlock);
1,346,043✔
1689
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dataBlock, rsp->totalRows));
1,346,043!
1690

1691
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
18,911,584✔
1692
    int64_t *ver = taosArrayGet(versions, i);
17,570,887✔
1693
    if (ver == NULL) continue;
17,570,889!
1694

1695
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
17,570,889!
1696
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
17,562,869!
1697
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
1698
      continue;
×
1699
    }
1700
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
17,562,868!
1701
    SWalCont* wCont = &pWalReader->pHead->head;
17,554,846✔
1702
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
17,562,868✔
1703
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
17,570,888✔
1704

1705
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, ranges, rsp, wCont->version));
17,570,888!
1706
  }
1707
  // printDataBlock(rsp->dataBlock, __func__, "processWalVerDataNew");
1708
  STREAM_CHECK_RET_GOTO(filterData(rsp, sStreamReaderInfo));
1,344,706!
1709
  rsp->totalRows = ((SSDataBlock*)rsp->dataBlock)->info.rows;
1,346,043✔
1710

1711
end:
12,059,803✔
1712
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
12,059,803✔
1713
            rsp->totalRows, rsp->ver, walGetAppliedVer(pWalReader->pWal));
1714
  walCloseReader(pWalReader);
12,059,803✔
1715
  return code;
12,062,408✔
1716
}
1717

1718
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
764,317✔
1719
  int32_t code = 0;
764,317✔
1720
  int32_t lino = 0;
764,317✔
1721
  SMetaReader metaReader = {0};
764,317✔
1722
  SStorageAPI api = {0};
764,317✔
1723
  initStorageAPI(&api);
764,317✔
1724
  *schemas = taosArrayInit(8, sizeof(SSchema));
764,317✔
1725
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
764,317!
1726
  
1727
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
764,317✔
1728
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
764,317✔
1729

1730
  SSchemaWrapper* sSchemaWrapper = NULL;
761,779✔
1731
  if (metaReader.me.type == TD_CHILD_TABLE) {
761,779!
1732
    int64_t suid = metaReader.me.ctbEntry.suid;
761,779✔
1733
    tDecoderClear(&metaReader.coder);
761,779✔
1734
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
761,779!
1735
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
761,779✔
1736
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
×
1737
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
×
1738
  } else {
1739
    qError("invalid table type:%d", metaReader.me.type);
×
1740
  }
1741

1742
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
4,335,175✔
1743
    SSchema* s = sSchemaWrapper->pSchema + j;
3,573,396✔
1744
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
7,146,792!
1745
  }
1746

1747
end:
764,317✔
1748
  api.metaReaderFn.clearReader(&metaReader);
764,317✔
1749
  STREAM_PRINT_LOG_END(code, lino);
764,317!
1750
  if (code != 0)  {
764,317✔
1751
    taosArrayDestroy(*schemas);
2,538✔
1752
    *schemas = NULL;
2,538✔
1753
  }
1754
  return code;
764,317✔
1755
}
1756

1757
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
761,779✔
1758
  int32_t code = 0;
761,779✔
1759
  int32_t lino = 0;
761,779✔
1760
  size_t  schemaLen = taosArrayGetSize(schemas);
761,779✔
1761
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
761,779!
1762
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
2,858,320✔
1763
    col_id_t* id = taosArrayGet(cols, i);
2,096,541✔
1764
    STREAM_CHECK_NULL_GOTO(id, terrno);
2,096,541!
1765
    for (size_t i = 0; i < schemaLen; i++) {
5,267,941!
1766
      SSchema* s = taosArrayGet(schemas, i);
5,267,941✔
1767
      STREAM_CHECK_NULL_GOTO(s, terrno);
5,267,941!
1768
      if (*id == s->colId) {
5,267,941✔
1769
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
2,096,541!
1770
        break;
2,096,541✔
1771
      }
1772
    }
1773
  }
1774
  taosArrayPopFrontBatch(schemas, schemaLen);
761,779✔
1775

1776
end:
761,779✔
1777
  return code;
761,779✔
1778
}
1779

1780
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
×
1781
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
1782
  int32_t      code = 0;
×
1783
  int32_t      lino = 0;
×
1784
  SArray*      schemas = NULL;
×
1785

1786
  SSDataBlock* pBlock2 = NULL;
×
1787

1788
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
×
1789
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
×
1790
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
×
1791

1792
  pBlock2->info.id.uid = uid;
×
1793

1794
  // STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock2, ver, uid, window));
1795
  //printDataBlock(pBlock2, __func__, "");
1796

1797
  *pBlock = pBlock2;
×
1798
  pBlock2 = NULL;
×
1799

1800
end:
×
1801
  STREAM_PRINT_LOG_END(code, lino);
×
1802
  blockDataDestroy(pBlock2);
×
1803
  taosArrayDestroy(schemas);
×
1804
  return code;
×
1805
}
1806

1807
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
×
1808
                                    STargetNode* pTargetNodeTs) {
1809
  int32_t code = 0;
×
1810
  int32_t lino = 0;
×
1811

1812
  SColumnNode*         pCol = NULL;
×
1813
  SColumnNode*         pCol1 = NULL;
×
1814
  SValueNode*          pVal = NULL;
×
1815
  SValueNode*          pVal1 = NULL;
×
1816
  SOperatorNode*       op = NULL;
×
1817
  SOperatorNode*       op1 = NULL;
×
1818
  SLogicConditionNode* cond = NULL;
×
1819

1820
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
×
1821
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
×
1822
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
×
1823
  pCol->node.resType.bytes = LONG_BYTES;
×
1824
  pCol->slotId = pTargetNodeTs->slotId;
×
1825
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
×
1826

1827
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
×
1828

1829
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
×
1830
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
×
1831
  pVal->node.resType.bytes = LONG_BYTES;
×
1832
  pVal->datum.i = start;
×
1833
  pVal->typeData = start;
×
1834

1835
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
×
1836
  pVal1->datum.i = end;
×
1837
  pVal1->typeData = end;
×
1838

1839
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
×
1840
  op->opType = OP_TYPE_GREATER_EQUAL;
×
1841
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1842
  op->node.resType.bytes = CHAR_BYTES;
×
1843
  op->pLeft = (SNode*)pCol;
×
1844
  op->pRight = (SNode*)pVal;
×
1845
  pCol = NULL;
×
1846
  pVal = NULL;
×
1847

1848
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
×
1849
  op1->opType = OP_TYPE_LOWER_EQUAL;
×
1850
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1851
  op1->node.resType.bytes = CHAR_BYTES;
×
1852
  op1->pLeft = (SNode*)pCol1;
×
1853
  op1->pRight = (SNode*)pVal1;
×
1854
  pCol1 = NULL;
×
1855
  pVal1 = NULL;
×
1856

1857
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
×
1858
  cond->condType = LOGIC_COND_TYPE_AND;
×
1859
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1860
  cond->node.resType.bytes = CHAR_BYTES;
×
1861
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
×
1862
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
×
1863
  op = NULL;
×
1864
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
×
1865
  op1 = NULL;
×
1866

1867
  *pCond = cond;
×
1868

1869
end:
×
1870
  if (code != 0) {
×
1871
    nodesDestroyNode((SNode*)pCol);
×
1872
    nodesDestroyNode((SNode*)pCol1);
×
1873
    nodesDestroyNode((SNode*)pVal);
×
1874
    nodesDestroyNode((SNode*)pVal1);
×
1875
    nodesDestroyNode((SNode*)op);
×
1876
    nodesDestroyNode((SNode*)op1);
×
1877
    nodesDestroyNode((SNode*)cond);
×
1878
  }
1879
  STREAM_PRINT_LOG_END(code, lino);
×
1880

1881
  return code;
×
1882
}
1883

1884
/*
1885
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
1886
  int32_t              code = 0;
1887
  int32_t              lino = 0;
1888
  SLogicConditionNode* pAndCondition = NULL;
1889
  SLogicConditionNode* cond = NULL;
1890

1891
  if (pTargetNodeTs == NULL) {
1892
    vError("stream reader %s no ts column", __func__);
1893
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
1894
  }
1895
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
1896
  cond->condType = LOGIC_COND_TYPE_OR;
1897
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
1898
  cond->node.resType.bytes = CHAR_BYTES;
1899
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
1900

1901
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
1902
    data->curIdx = i;
1903

1904
    SReadHandle handle = {0};
1905
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
1906
    if (!handle.winRangeValid) {
1907
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
1908
              handle.winRange.ekey);
1909
      continue;
1910
    }
1911
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
1912
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
1913
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
1914
    pAndCondition = NULL;
1915
  }
1916

1917
  *pCond = cond;
1918

1919
end:
1920
  if (code != 0) {
1921
    nodesDestroyNode((SNode*)pAndCondition);
1922
    nodesDestroyNode((SNode*)cond);
1923
  }
1924
  STREAM_PRINT_LOG_END(code, lino);
1925

1926
  return code;
1927
}
1928
*/
1929

1930
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
1,202,946✔
1931
                                    STimeRangeNode* node, SReadHandle* handle, bool isExtWin) {
1932
  int32_t code = 0;
1,202,946✔
1933
  int32_t lino = 0;
1,202,946✔
1934
  void* pTask = sStreamReaderCalcInfo->pTask;
1,202,946✔
1935
  STimeWindow* pWin = isExtWin ? &handle->extWinRange : &handle->winRange;
1,202,946!
1936
  bool* pValid = isExtWin ? &handle->extWinRangeValid : &handle->winRangeValid;
1,202,946!
1937
  
1938
  if (req->pStRtFuncInfo->withExternalWindow) {
1,202,946✔
1939
    sStreamReaderCalcInfo->tmpRtFuncInfo.curIdx = 0;
450,380✔
1940
    sStreamReaderCalcInfo->tmpRtFuncInfo.triggerType = req->pStRtFuncInfo->triggerType;
450,380✔
1941
    
1942
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
450,380✔
1943
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
450,380✔
1944
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
450,380!
1945
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
450,380!
1946

1947
    if (!node->needCalc) {
450,380✔
1948
      pWin->skey = pFirst->wstart;
301,749✔
1949
      pWin->ekey = pLast->wend;
301,749✔
1950
      *pValid = true;
301,749✔
1951
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
301,749✔
1952
        pWin->ekey--;
161,770✔
1953
      }
1954
    } else {
1955
      SSTriggerCalcParam* pTmp = taosArrayGet(sStreamReaderCalcInfo->tmpRtFuncInfo.pStreamPesudoFuncVals, 0);
148,631✔
1956
      memcpy(pTmp, pFirst, sizeof(*pTmp));
148,631!
1957

1958
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 1));
148,631!
1959
      if (*pValid) {
148,631!
1960
        int64_t skey = pWin->skey;
148,631✔
1961

1962
        memcpy(pTmp, pLast, sizeof(*pTmp));
148,631!
1963
        STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 2));
148,631!
1964

1965
        if (*pValid) {
148,631!
1966
          pWin->skey = skey;
148,631✔
1967
        }
1968
      }
1969
      pWin->ekey--;
148,631✔
1970
    }
1971
  } else {
1972
    if (!node->needCalc) {
752,566!
1973
      SSTriggerCalcParam* pCurr = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, req->pStRtFuncInfo->curIdx);
143,446✔
1974
      pWin->skey = pCurr->wstart;
143,446✔
1975
      pWin->ekey = pCurr->wend;
143,446✔
1976
      *pValid = true;
143,446✔
1977
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
143,446✔
1978
        pWin->ekey--;
126,934✔
1979
      }
1980
    } else {
1981
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, req->pStRtFuncInfo, pWin, pValid, 3));
609,120!
1982
      pWin->ekey--;
609,120✔
1983
    }
1984
  }
1985

1986
  ST_TASK_DLOG("%s type:%s, withExternalWindow:%d, skey:%" PRId64 ", ekey:%" PRId64 ", validRange:%d", 
1,202,946!
1987
      __func__, isExtWin ? "interp range" : "scan time range", req->pStRtFuncInfo->withExternalWindow, pWin->skey, pWin->ekey, *pValid);
1988

1989
end:
42,812✔
1990

1991
  if (code) {
1,202,946!
1992
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1993
  }
1994
  
1995
  return code;
1,202,946✔
1996
}
1997

1998
static int32_t processTs(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
1,357,778✔
1999
                                  SStreamReaderTaskInner* pTaskInner) {
2000
  int32_t code = 0;
1,357,778✔
2001
  int32_t lino = 0;
1,357,778✔
2002

2003
  void* pTask = sStreamReaderInfo->pTask;
1,357,778✔
2004
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTaskInner->pTableList), sizeof(STsInfo));
1,357,778✔
2005
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
1,357,778!
2006
  while (true) {
3,089,203✔
2007
    bool hasNext = false;
4,448,317✔
2008
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
4,446,981!
2009
    if (hasNext) {
4,441,671!
2010
      pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
1,303,607✔
2011
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
1,302,264✔
2012
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
1,303,607!
2013
      if (pTaskInner->options.order == TSDB_ORDER_ASC) {
1,303,607✔
2014
        tsInfo->ts = pTaskInner->pResBlock->info.window.skey;
787,802✔
2015
      } else {
2016
        tsInfo->ts = pTaskInner->pResBlock->info.window.ekey;
515,805✔
2017
      }
2018
      tsInfo->gId = (sStreamReaderInfo->groupByTbname || sStreamReaderInfo->tableType != TSDB_SUPER_TABLE) ? 
2,789,873!
2019
                    pTaskInner->pResBlock->info.id.uid : qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
1,484,937✔
2020
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
1,303,607✔
2021
              tsInfo->gId, tsRsp->ver);
2022
    }
2023
    
2024
    pTaskInner->currentGroupIndex++;
4,443,014✔
2025
    if (pTaskInner->currentGroupIndex >= qStreamGetTableListGroupNum(pTaskInner->pTableList) || pTaskInner->options.gid != 0) {
4,445,664✔
2026
      break;
2027
    }
2028
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTaskInner));
3,086,554!
2029
  }
2030

2031
end:
1,356,461✔
2032
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,356,461!
2033
  return code;
1,357,778✔
2034
}
2035

2036
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
253,448✔
2037
  int32_t code = 0;
253,448✔
2038
  int32_t lino = 0;
253,448✔
2039
  void*   buf = NULL;
253,448✔
2040
  size_t  size = 0;
253,448✔
2041
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
253,448!
2042
  void* pTask = sStreamReaderInfo->pTask;
253,448✔
2043

2044
  ST_TASK_DLOG("vgId:%d %s start, trigger hash size:%d, calc hash size:%d", TD_VID(pVnode), __func__,
253,448✔
2045
                tSimpleHashGetSize(req->setTableReq.uidInfoTrigger), tSimpleHashGetSize(req->setTableReq.uidInfoCalc));
2046

2047
  TSWAP(sStreamReaderInfo->uidHashTrigger, req->setTableReq.uidInfoTrigger);
253,448✔
2048
  TSWAP(sStreamReaderInfo->uidHashCalc, req->setTableReq.uidInfoCalc);
253,448✔
2049
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashTrigger, TSDB_CODE_INVALID_PARA);
253,448!
2050
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashCalc, TSDB_CODE_INVALID_PARA);
253,448!
2051

2052
  sStreamReaderInfo->isVtableStream = true;
253,448✔
2053
  sStreamReaderInfo->groupByTbname = true;
253,448✔
2054
end:
253,448✔
2055
  STREAM_PRINT_LOG_END_WITHID(code, lino);
253,448!
2056
  SRpcMsg rsp = {
253,448✔
2057
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2058
  tmsgSendRsp(&rsp);
253,448✔
2059
  return code;
253,448✔
2060
}
2061

2062
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
738,610✔
2063
  int32_t                 code = 0;
738,610✔
2064
  int32_t                 lino = 0;
738,610✔
2065
  SStreamReaderTaskInner* pTaskInner = NULL;
738,610✔
2066
  SStreamTsResponse       lastTsRsp = {0};
739,926✔
2067
  void*                   buf = NULL;
739,926✔
2068
  size_t                  size = 0;
739,926✔
2069

2070
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
739,926!
2071
  void* pTask = sStreamReaderInfo->pTask;
739,926✔
2072

2073
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
739,926✔
2074

2075
  BUILD_OPTION(options, sStreamReaderInfo, -1, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, sStreamReaderInfo->tsSchemas, true,
739,926✔
2076
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, true, sStreamReaderInfo->uidHashTrigger);
2077
  SStorageAPI api = {0};
739,926✔
2078
  initStorageAPI(&api);
739,926✔
2079
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
739,926!
2080

2081
  lastTsRsp.ver = pVnode->state.applied + 1;
739,926✔
2082

2083
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
739,926!
2084
  ST_TASK_DLOG("vgId:%d %s get result, ver:%" PRId64, TD_VID(pVnode), __func__, lastTsRsp.ver);
739,926✔
2085
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
739,926!
2086
  if (stDebugFlag & DEBUG_DEBUG) {
739,926✔
2087
    int32_t nInfo = taosArrayGetSize(lastTsRsp.tsInfo);
582,066✔
2088
    for (int32_t i = 0; i < nInfo; i++) {
1,044,381✔
2089
      STsInfo* tsInfo = TARRAY_GET_ELEM(lastTsRsp.tsInfo, i);
462,315✔
2090
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64, TD_VID(pVnode), __func__, tsInfo->ts, tsInfo->gId);
462,315!
2091
    }
2092
  }
2093

2094
end:
739,926✔
2095
  STREAM_PRINT_LOG_END_WITHID(code, lino);
739,926!
2096
  SRpcMsg rsp = {
739,926✔
2097
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2098
  tmsgSendRsp(&rsp);
739,926✔
2099
  taosArrayDestroy(lastTsRsp.tsInfo);
739,926✔
2100
  releaseStreamTask(&pTaskInner);
739,926✔
2101
  return code;
739,926✔
2102
}
2103

2104
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
628,391✔
2105
  int32_t                 code = 0;
628,391✔
2106
  int32_t                 lino = 0;
628,391✔
2107
  SStreamReaderTaskInner* pTaskInner = NULL;
628,391✔
2108
  SStreamTsResponse       firstTsRsp = {0};
628,391✔
2109
  void*                   buf = NULL;
628,391✔
2110
  size_t                  size = 0;
628,391✔
2111

2112
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
628,391!
2113
  void* pTask = sStreamReaderInfo->pTask;
628,391✔
2114
  ST_TASK_DLOG("vgId:%d %s start, startTime:%"PRId64" ver:%"PRId64" gid:%"PRId64, TD_VID(pVnode), __func__, req->firstTsReq.startTime, req->firstTsReq.ver, req->firstTsReq.gid);
628,391✔
2115
  BUILD_OPTION(options, sStreamReaderInfo, req->firstTsReq.ver, TSDB_ORDER_ASC, req->firstTsReq.startTime, INT64_MAX, sStreamReaderInfo->tsSchemas, true,
628,391✔
2116
               STREAM_SCAN_GROUP_ONE_BY_ONE, req->firstTsReq.gid, true, sStreamReaderInfo->uidHashTrigger);
2117
  SStorageAPI api = {0};
628,391✔
2118
  initStorageAPI(&api);
628,391✔
2119
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
628,391✔
2120
  
2121
  firstTsRsp.ver = pVnode->state.applied;
619,188✔
2122
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
617,852!
2123

2124
  ST_TASK_DLOG("vgId:%d %s get result size:%"PRIzu", ver:%"PRId64, TD_VID(pVnode), __func__, taosArrayGetSize(firstTsRsp.tsInfo), firstTsRsp.ver);
617,852✔
2125
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
617,852!
2126
  if (stDebugFlag & DEBUG_DEBUG) {
617,852✔
2127
    int32_t nInfo = taosArrayGetSize(firstTsRsp.tsInfo);
449,350✔
2128
    for (int32_t i = 0; i < nInfo; i++) {
1,023,194✔
2129
      STsInfo* tsInfo = TARRAY_GET_ELEM(firstTsRsp.tsInfo, i);
573,844✔
2130
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64, TD_VID(pVnode), __func__, tsInfo->ts, tsInfo->gId);
573,844!
2131
    }
2132
  }
2133

2134
end:
627,055✔
2135
  STREAM_PRINT_LOG_END_WITHID(code, lino);
625,718!
2136
  SRpcMsg rsp = {
625,718✔
2137
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2138
  tmsgSendRsp(&rsp);
627,055✔
2139
  taosArrayDestroy(firstTsRsp.tsInfo);
628,391✔
2140
  releaseStreamTask(&pTaskInner);
628,391✔
2141
  return code;
627,055✔
2142
}
2143

2144
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,283,977✔
2145
  int32_t code = 0;
1,283,977✔
2146
  int32_t lino = 0;
1,283,977✔
2147
  void*   buf = NULL;
1,283,977✔
2148
  size_t  size = 0;
1,283,977✔
2149

2150
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
1,283,977!
2151
  void* pTask = sStreamReaderInfo->pTask;
1,283,977✔
2152
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
1,283,977✔
2153

2154
  SStreamReaderTaskInner* pTaskInner = NULL;
1,283,977✔
2155
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
1,283,977✔
2156

2157
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
1,283,977!
2158
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbMetaReq.ver, req->tsdbMetaReq.order, req->tsdbMetaReq.startTime, req->tsdbMetaReq.endTime, sStreamReaderInfo->tsSchemas, true, 
1,283,977✔
2159
      (req->tsdbMetaReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), req->tsdbMetaReq.gid, true, sStreamReaderInfo->uidHashTrigger);
2160
    SStorageAPI api = {0};
1,283,977✔
2161
    initStorageAPI(&api);
1,283,977✔
2162
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
1,283,977✔
2163
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
1,274,774!
2164
    
2165
    STREAM_CHECK_RET_GOTO(createBlockForTsdbMeta(&pTaskInner->pResBlockDst, sStreamReaderInfo->isVtableStream));
1,274,774!
2166
  } else {
2167
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2168
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2169
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2170
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2171
  }
2172

2173
  blockDataCleanup(pTaskInner->pResBlockDst);
1,274,774✔
2174
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
1,274,774!
2175
  bool hasNext = true;
1,274,774✔
2176
  while (true) {
517,442✔
2177
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
1,792,216!
2178
    if (!hasNext) {
1,792,216!
2179
      break;
1,274,774✔
2180
    }
2181
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
517,442✔
2182
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
517,442✔
2183

2184
    int32_t index = 0;
517,442✔
2185
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
517,442!
2186
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
517,442!
2187
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
517,442!
2188
    if (!sStreamReaderInfo->isVtableStream) {
517,442!
2189
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
86,562!
2190
    }
2191
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
517,442!
2192

2193
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
517,442✔
2194
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2195
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2196
            pTaskInner->pResBlockDst->info.rows++;
517,442✔
2197
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
517,442!
2198
      break;
×
2199
    }
2200
  }
2201

2202
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
1,274,774✔
2203
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
1,274,774!
2204
  printDataBlock(pTaskInner->pResBlockDst, __func__, "meta", ((SStreamTask *)sStreamReaderInfo->pTask)->streamId);
1,274,774✔
2205
  if (!hasNext) {
1,274,774!
2206
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
1,274,774!
2207
  }
2208

2209
end:
1,282,661✔
2210
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,283,977!
2211
  SRpcMsg rsp = {
1,283,977✔
2212
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2213
  tmsgSendRsp(&rsp);
1,283,977✔
2214
  return code;
1,283,977✔
2215
}
2216

2217
static int32_t vnodeProcessStreamTsdbTsDataReqNonVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
43,517✔
2218
  int32_t                 code = 0;
43,517✔
2219
  int32_t                 lino = 0;
43,517✔
2220
  SStreamReaderTaskInner* pTaskInner = NULL;
43,517✔
2221
  void*                   buf = NULL;
43,517✔
2222
  size_t                  size = 0;
43,517✔
2223
  SSDataBlock*            pBlockRes = NULL;
43,517✔
2224

2225
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
43,517!
2226
  void* pTask = sStreamReaderInfo->pTask;
43,517✔
2227
  ST_TASK_DLOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
43,517!
2228
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2229
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2230

2231
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
43,517✔
2232
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
2233
  options.uid = req->tsdbTsDataReq.uid;
43,517✔
2234
  SStorageAPI api = {0};
43,517✔
2235
  initStorageAPI(&api);
43,517✔
2236
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
43,517!
2237
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
43,517!
2238
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
43,517!
2239

2240
  while (1) {
43,517✔
2241
    bool hasNext = false;
87,034✔
2242
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
87,034!
2243
    if (!hasNext) {
87,034!
2244
      break;
43,517✔
2245
    }
2246
    if (!sStreamReaderInfo->isVtableStream){
43,517!
2247
      pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
43,517✔
2248
    }
2249

2250
    SSDataBlock* pBlock = NULL;
43,517✔
2251
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
43,517!
2252
    if (pBlock != NULL && pBlock->info.rows > 0) {
43,517!
2253
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo, false, &api, pBlock->info.id.uid, pBlock,
43,517!
2254
          0, pBlock->info.rows, 1));
2255
    }
2256
    
2257
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
43,517!
2258
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
43,517!
2259
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
43,517!
2260
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2261
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2262
  }
2263

2264
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
43,517✔
2265

2266
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
43,517!
2267
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
43,517!
2268

2269
end:
43,517✔
2270
  STREAM_PRINT_LOG_END_WITHID(code, lino);
43,517!
2271
  SRpcMsg rsp = {
43,517✔
2272
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2273
  tmsgSendRsp(&rsp);
43,517✔
2274
  blockDataDestroy(pBlockRes);
43,517✔
2275

2276
  releaseStreamTask(&pTaskInner);
43,517✔
2277
  return code;
43,517✔
2278
}
2279

2280
static int32_t vnodeProcessStreamTsdbTsDataReqVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
2,538✔
2281
  int32_t                 code = 0;
2,538✔
2282
  int32_t                 lino = 0;
2,538✔
2283
  SStreamReaderTaskInner* pTaskInner = NULL;
2,538✔
2284
  void*                   buf = NULL;
2,538✔
2285
  size_t                  size = 0;
2,538✔
2286
  SSDataBlock*            pBlockRes = NULL;
2,538✔
2287

2288
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
2,538!
2289
  void* pTask = sStreamReaderInfo->pTask;
2,538✔
2290
  ST_TASK_ELOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
2,538!
2291
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2292
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2293

2294
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
2,538✔
2295
               sStreamReaderInfo->tsSchemas, true, STREAM_SCAN_ALL, 0, true, NULL);
2296
  options.suid = req->tsdbTsDataReq.suid;
2,538✔
2297
  options.uid = req->tsdbTsDataReq.uid;
2,538✔
2298
  SStorageAPI api = {0};
2,538✔
2299
  initStorageAPI(&api);
2,538✔
2300
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->tsBlock, &api));
2,538!
2301
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
2,538!
2302

2303
  while (1) {
2,538✔
2304
    bool hasNext = false;
5,076✔
2305
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
5,076!
2306
    if (!hasNext) {
5,076!
2307
      break;
2,538✔
2308
    }
2309

2310
    SSDataBlock* pBlock = NULL;
2,538✔
2311
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
2,538!
2312
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRes, pBlock));
2,538!
2313
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
2,538!
2314
            TD_VID(pVnode), __func__, pBlockRes->info.window.skey, pBlockRes->info.window.ekey,
2315
            pBlockRes->info.id.uid, pBlockRes->info.id.groupId, pBlockRes->info.rows);
2316
  }
2317

2318
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
2,538!
2319
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
2,538!
2320

2321
end:
2,538✔
2322
  STREAM_PRINT_LOG_END_WITHID(code, lino);
2,538!
2323
  SRpcMsg rsp = {
2,538✔
2324
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2325
  tmsgSendRsp(&rsp);
2,538✔
2326
  blockDataDestroy(pBlockRes);
2,538✔
2327

2328
  releaseStreamTask(&pTaskInner);
2,538✔
2329
  return code;
2,538✔
2330
}
2331

2332
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
391,807✔
2333
  int32_t code = 0;
391,807✔
2334
  int32_t lino = 0;
391,807✔
2335
  void*   buf = NULL;
391,807✔
2336
  size_t  size = 0;
391,807✔
2337

2338
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
391,807!
2339
  SStreamReaderTaskInner* pTaskInner = NULL;
391,807✔
2340
  void* pTask = sStreamReaderInfo->pTask;
391,807✔
2341
  ST_TASK_DLOG("vgId:%d %s start. ver:%"PRId64",order:%d,startTs:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, req->tsdbTriggerDataReq.gid);
391,807✔
2342
  
2343
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
391,807✔
2344

2345
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
391,807✔
2346
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
188,542✔
2347
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
2348
                 req->tsdbTriggerDataReq.gid, true, NULL);
2349
    SStorageAPI api = {0};
188,542✔
2350
    initStorageAPI(&api);
188,542✔
2351
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
188,542!
2352

2353
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
188,542!
2354
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
188,542!
2355
  } else {
2356
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
203,265✔
2357
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
203,265!
2358
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
203,265✔
2359
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
203,265!
2360
  }
2361

2362
  blockDataCleanup(pTaskInner->pResBlockDst);
391,807✔
2363
  bool hasNext = true;
391,807✔
2364
  while (1) {
×
2365
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
391,807!
2366
    if (!hasNext) {
391,807!
2367
      break;
188,542✔
2368
    }
2369
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
203,265✔
2370
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
203,265✔
2371

2372
    SSDataBlock* pBlock = NULL;
203,265✔
2373
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
203,265!
2374
    if (pBlock != NULL && pBlock->info.rows > 0) {
203,265!
2375
      STREAM_CHECK_RET_GOTO(
203,265!
2376
        processTag(pVnode, sStreamReaderInfo, false, &pTaskInner->api, pBlock->info.id.uid, pBlock, 0, pBlock->info.rows, 1));
2377
    }
2378
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
203,265!
2379
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
203,265!
2380
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
201,928✔
2381
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2382
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2383
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
201,928!
2384
      break;
203,265✔
2385
    }
2386
  }
2387

2388
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
391,807!
2389
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
391,807✔
2390
  if (!hasNext) {
391,807!
2391
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
188,542!
2392
  }
2393

2394
end:
391,807✔
2395
  STREAM_PRINT_LOG_END_WITHID(code, lino);
391,807!
2396
  SRpcMsg rsp = {
391,807✔
2397
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2398
  tmsgSendRsp(&rsp);
391,807✔
2399

2400
  return code;
391,807✔
2401
}
2402

2403
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
37,897,421✔
2404
  int32_t code = 0;
37,897,421✔
2405
  int32_t lino = 0;
37,897,421✔
2406
  void*   buf = NULL;
37,897,421✔
2407
  size_t  size = 0;
37,898,741✔
2408
  SSDataBlock*            pBlockRes = NULL;
37,900,061✔
2409

2410
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
37,898,741!
2411
  void* pTask = sStreamReaderInfo->pTask;
37,898,741✔
2412
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64",ver:%"PRId64, TD_VID(pVnode), __func__, 
37,898,741✔
2413
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid, req->tsdbCalcDataReq.ver);
2414

2415
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
37,898,741!
2416

2417
  SStreamReaderTaskInner* pTaskInner = NULL;
37,898,741✔
2418
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
37,898,741✔
2419

2420
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
37,900,061!
2421
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
37,900,061✔
2422
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
2423
    SStorageAPI api = {0};
37,900,061✔
2424
    initStorageAPI(&api);
37,900,061✔
2425
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
37,898,741✔
2426

2427
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
37,493,574!
2428
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
37,492,254!
2429
  } else {
2430
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2431
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2432
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2433
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2434
  }
2435

2436
  blockDataCleanup(pTaskInner->pResBlockDst);
37,486,974✔
2437
  bool hasNext = true;
37,486,974✔
2438
  while (1) {
3,552,869✔
2439
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
41,042,483!
2440
    if (!hasNext) {
41,042,483!
2441
      break;
37,492,254✔
2442
    }
2443
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
3,550,229✔
2444

2445
    SSDataBlock* pBlock = NULL;
3,551,565✔
2446
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
3,552,869!
2447
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
3,548,957!
2448
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
3,550,261!
2449
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
3,552,869!
2450
      break;
×
2451
    }
2452
  }
2453

2454
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
37,489,614!
2455
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
37,490,950!
2456
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
37,488,294✔
2457
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
37,486,974!
2458
  printDataBlock(pBlockRes, __func__, "tsdb_calc_data", ((SStreamTask*)pTask)->streamId);
37,493,574✔
2459
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
37,492,254✔
2460
  if (!hasNext) {
37,493,574!
2461
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
37,493,574!
2462
  }
2463

2464
end:
37,894,797✔
2465
  STREAM_PRINT_LOG_END_WITHID(code, lino);
37,893,461!
2466
  SRpcMsg rsp = {
37,902,701✔
2467
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2468
  tmsgSendRsp(&rsp);
37,900,061✔
2469
  blockDataDestroy(pBlockRes);
37,898,741✔
2470
  return code;
37,898,741✔
2471
}
2472

2473
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
764,317✔
2474
  int32_t code = 0;
764,317✔
2475
  int32_t lino = 0;
764,317✔
2476
  void*   buf = NULL;
764,317✔
2477
  size_t  size = 0;
764,317✔
2478
  int32_t* slotIdList = NULL;
764,317✔
2479
  SArray* sortedCid = NULL;
764,317✔
2480
  SArray* schemas = NULL;
764,317✔
2481
  
2482
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
764,317!
2483
  void* pTask = sStreamReaderInfo->pTask;
764,317✔
2484
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",ver:%"PRId64, TD_VID(pVnode), __func__, 
764,317✔
2485
    req->tsdbDataReq.skey, req->tsdbDataReq.ekey, req->tsdbDataReq.uid, req->tsdbDataReq.ver);
2486
    
2487
  SStreamReaderTaskInner* pTaskInner = NULL;
764,317✔
2488
  int64_t key = req->tsdbDataReq.uid;
764,317✔
2489

2490
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
764,317!
2491
    // sort cid and build slotIdList
2492
    slotIdList = taosMemoryMalloc(taosArrayGetSize(req->tsdbDataReq.cids) * sizeof(int32_t));
764,317!
2493
    STREAM_CHECK_NULL_GOTO(slotIdList, terrno);
764,317!
2494
    sortedCid = taosArrayDup(req->tsdbDataReq.cids, NULL);
764,317✔
2495
    STREAM_CHECK_NULL_GOTO(sortedCid, terrno);
764,317!
2496
    taosArraySort(sortedCid, sortCid);
764,317✔
2497
    for (int32_t i = 0; i < taosArrayGetSize(req->tsdbDataReq.cids); i++) {
2,868,472✔
2498
      int16_t* cid = taosArrayGet(req->tsdbDataReq.cids, i);
2,104,155✔
2499
      STREAM_CHECK_NULL_GOTO(cid, terrno);
2,104,155!
2500
      for (int32_t j = 0; j < taosArrayGetSize(sortedCid); j++) {
4,032,547!
2501
        int16_t* cidSorted = taosArrayGet(sortedCid, j);
4,032,547✔
2502
        STREAM_CHECK_NULL_GOTO(cidSorted, terrno);
4,032,547!
2503
        if (*cid == *cidSorted) {
4,032,547✔
2504
          slotIdList[j] = i;
2,104,155✔
2505
          break;
2,104,155✔
2506
        }
2507
      }
2508
    }
2509

2510
    STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, req->tsdbDataReq.uid, &schemas));
764,317✔
2511
    STREAM_CHECK_RET_GOTO(shrinkScheams(req->tsdbDataReq.cids, schemas));
761,779!
2512
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbDataReq.ver, req->tsdbDataReq.order, req->tsdbDataReq.skey,
761,779✔
2513
                    req->tsdbDataReq.ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
2514

2515
    options.suid = req->tsdbDataReq.suid;
761,779✔
2516
    options.uid = req->tsdbDataReq.uid;
761,779✔
2517

2518
    SStorageAPI api = {0};
761,779✔
2519
    initStorageAPI(&api);
761,779✔
2520
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
761,779!
2521
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
761,779!
2522

2523
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
761,779✔
2524
    cleanupQueryTableDataCond(&pTaskInner->cond);
761,779✔
2525
    taosArraySort(pTaskInner->options.schemas, sortSSchema);
761,779✔
2526

2527
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
761,779!
2528
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
2529
                                                        pTaskInner->options.suid, pTaskInner->options.ver, &slotIdList));
2530
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
761,779!
2531
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
2532
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
761,779!
2533
  } else {
2534
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2535
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2536
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2537
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2538
  }
2539

2540
  blockDataCleanup(pTaskInner->pResBlockDst);
761,779✔
2541
  bool hasNext = true;
761,779✔
2542
  while (1) {
761,779✔
2543
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
1,523,558!
2544
    if (!hasNext) {
1,523,558!
2545
      break;
761,779✔
2546
    }
2547

2548
    SSDataBlock* pBlock = NULL;
761,779✔
2549
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
761,779!
2550
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
761,779!
2551
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
761,779!
2552
      break;
×
2553
    }
2554
  }
2555
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
761,779!
2556
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
761,779✔
2557
  printDataBlock(pTaskInner->pResBlockDst, __func__, "tsdb_data", ((SStreamTask*)pTask)->streamId);
761,779✔
2558
  if (!hasNext) {
761,779!
2559
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
761,779!
2560
  }
2561

2562
end:
764,317✔
2563
  STREAM_PRINT_LOG_END_WITHID(code, lino);
764,317!
2564
  SRpcMsg rsp = {
764,317✔
2565
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2566
  tmsgSendRsp(&rsp);
764,317✔
2567
  taosMemFree(slotIdList);
764,317✔
2568
  taosArrayDestroy(sortedCid);
764,317✔
2569
  taosArrayDestroy(schemas);
764,317✔
2570
  return code;
764,317✔
2571
}
2572

2573
static int32_t vnodeProcessStreamWalMetaNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
20,776,140✔
2574
  int32_t      code = 0;
20,776,140✔
2575
  int32_t      lino = 0;
20,776,140✔
2576
  void*        buf = NULL;
20,776,140✔
2577
  size_t       size = 0;
20,776,140✔
2578
  int64_t      lastVer = 0;
20,776,140✔
2579
  SSTriggerWalNewRsp resultRsp = {0};
20,776,140✔
2580

2581
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
20,776,120✔
2582
  void* pTask = sStreamReaderInfo->pTask;
20,690,188✔
2583
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaNewReq.lastVer);
20,691,524✔
2584

2585
  if (sStreamReaderInfo->metaBlock == NULL) {
20,692,848✔
2586
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
515,366!
2587
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
515,366!
2588
  }
2589
  blockDataEmpty(sStreamReaderInfo->metaBlock);
20,694,196✔
2590
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
20,672,840✔
2591
  resultRsp.ver = req->walMetaNewReq.lastVer;
20,672,840✔
2592
  STREAM_CHECK_RET_GOTO(processWalVerMetaNew(pVnode, &resultRsp, sStreamReaderInfo, req->walMetaNewReq.ctime));
20,676,828!
2593

2594
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
20,675,502✔
2595
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
20,682,172✔
2596
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, NULL);
681,782✔
2597
  buf = rpcMallocCont(size);
680,473✔
2598
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, NULL);
681,782✔
2599
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
681,782✔
2600

2601
end:
20,768,104✔
2602
  if (resultRsp.totalRows == 0) {
20,765,432✔
2603
    code = TSDB_CODE_STREAM_NO_DATA;
20,082,314✔
2604
    buf = rpcMallocCont(sizeof(int64_t));
20,082,314✔
2605
    *(int64_t *)buf = resultRsp.ver;
20,068,985✔
2606
    size = sizeof(int64_t);
20,071,657✔
2607
  }
2608
  SRpcMsg rsp = {
20,754,775✔
2609
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2610
  tmsgSendRsp(&rsp);
20,761,465✔
2611
  if (code == TSDB_CODE_STREAM_NO_DATA){
20,777,466✔
2612
    code = 0;
20,095,591✔
2613
  }
2614
  STREAM_PRINT_LOG_END_WITHID(code, lino);
20,777,466!
2615
  blockDataDestroy(resultRsp.deleteBlock);
20,781,555✔
2616
  blockDataDestroy(resultRsp.dropBlock);
20,778,792✔
2617

2618
  return code;
20,776,119✔
2619
}
2620
static int32_t vnodeProcessStreamWalMetaDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
9,874,573✔
2621
  int32_t      code = 0;
9,874,573✔
2622
  int32_t      lino = 0;
9,874,573✔
2623
  void*        buf = NULL;
9,874,573✔
2624
  size_t       size = 0;
9,874,573✔
2625
  SSTriggerWalNewRsp resultRsp = {0};
9,874,573✔
2626
  
2627
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
9,873,236!
2628
  void* pTask = sStreamReaderInfo->pTask;
9,873,236✔
2629
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaDataNewReq.lastVer);
9,869,224✔
2630

2631
  if (sStreamReaderInfo->metaBlock == NULL) {
9,869,317✔
2632
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
239,766!
2633
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
239,766!
2634
  }
2635
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
9,878,588✔
2636
  resultRsp.dataBlock = sStreamReaderInfo->triggerBlock;
9,875,910✔
2637
  resultRsp.ver = req->walMetaDataNewReq.lastVer;
9,874,571✔
2638
  STREAM_CHECK_RET_GOTO(processWalVerMetaDataNew(pVnode, sStreamReaderInfo, &resultRsp));
9,878,585!
2639

2640
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
9,871,805✔
2641
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
427,236✔
2642
  buf = rpcMallocCont(size);
427,236✔
2643
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
425,897✔
2644
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
425,897✔
2645
  printDataBlock(sStreamReaderInfo->triggerBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
427,236✔
2646
  printDataBlock(resultRsp.dropBlock, __func__, "drop", ((SStreamTask*)pTask)->streamId);
425,897✔
2647
  printDataBlock(resultRsp.deleteBlock, __func__, "delete", ((SStreamTask*)pTask)->streamId);
427,236✔
2648
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
427,236✔
2649

2650
end:
9,870,466✔
2651
  if (resultRsp.totalRows == 0) {
9,870,466✔
2652
    buf = rpcMallocCont(sizeof(int64_t));
9,448,581✔
2653
    *(int64_t *)buf = resultRsp.ver;
9,436,634✔
2654
    size = sizeof(int64_t);
9,439,310✔
2655
    code = TSDB_CODE_STREAM_NO_DATA;
9,439,310✔
2656
  }
2657
  SRpcMsg rsp = {
9,861,195✔
2658
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2659
  tmsgSendRsp(&rsp);
9,869,220✔
2660
  if (code == TSDB_CODE_STREAM_NO_DATA){
9,875,820✔
2661
    code = 0;
9,448,676✔
2662
  }
2663
  blockDataDestroy(resultRsp.deleteBlock);
9,875,820✔
2664
  blockDataDestroy(resultRsp.dropBlock);
9,873,141✔
2665

2666
  STREAM_PRINT_LOG_END_WITHID(code, lino);
9,870,562!
2667

2668
  return code;
9,870,562✔
2669
}
2670

2671
static int32_t vnodeProcessStreamWalDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
10,334,157✔
2672
  int32_t      code = 0;
10,334,157✔
2673
  int32_t      lino = 0;
10,334,157✔
2674
  void*        buf = NULL;
10,334,157✔
2675
  size_t       size = 0;
10,334,157✔
2676
  SSTriggerWalNewRsp resultRsp = {0};
10,334,157✔
2677

2678
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
10,334,157!
2679
  void* pTask = sStreamReaderInfo->pTask;
10,334,157✔
2680
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
10,334,157✔
2681

2682
  resultRsp.dataBlock = sStreamReaderInfo->triggerBlock;
10,334,157✔
2683
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
10,332,821!
2684
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
10,334,157✔
2685

2686
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
10,332,821✔
2687

2688
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
447,638✔
2689
  buf = rpcMallocCont(size);
447,638✔
2690
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
447,638✔
2691
  printDataBlock(sStreamReaderInfo->triggerBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
447,638✔
2692
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
447,638✔
2693

2694
end:
10,332,821✔
2695
  if (resultRsp.totalRows == 0) {
10,331,485✔
2696
    buf = rpcMallocCont(sizeof(int64_t));
9,885,183✔
2697
    *(int64_t *)buf = resultRsp.ver;
9,882,511✔
2698
    size = sizeof(int64_t);
9,882,511✔
2699
    code = TSDB_CODE_STREAM_NO_DATA;
9,882,511✔
2700
  }
2701
  SRpcMsg rsp = {
10,328,813✔
2702
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2703
  tmsgSendRsp(&rsp);
10,330,149✔
2704
  if (code == TSDB_CODE_STREAM_NO_DATA){
10,331,484✔
2705
    code = 0;
9,885,182✔
2706
  }
2707

2708
  blockDataDestroy(resultRsp.deleteBlock);
10,331,484✔
2709
  blockDataDestroy(resultRsp.dropBlock);
10,332,820✔
2710
  STREAM_PRINT_LOG_END_WITHID(code, lino);
10,332,820!
2711

2712
  return code;
10,332,820✔
2713
}
2714

2715
static int32_t vnodeProcessStreamWalCalcDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,728,251✔
2716
  int32_t      code = 0;
1,728,251✔
2717
  int32_t      lino = 0;
1,728,251✔
2718
  void*        buf = NULL;
1,728,251✔
2719
  size_t       size = 0;
1,728,251✔
2720
  SSTriggerWalNewRsp resultRsp = {0};
1,728,251✔
2721
  SSDataBlock* pBlock1 = NULL;
1,728,251✔
2722
  SSDataBlock* pBlock2 = NULL;
1,728,251✔
2723
  
2724
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
1,726,899!
2725
  void* pTask = sStreamReaderInfo->pTask;
1,726,899✔
2726
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
1,726,899✔
2727

2728
  resultRsp.dataBlock = sStreamReaderInfo->isVtableStream ? sStreamReaderInfo->calcBlock : sStreamReaderInfo->triggerBlock;
1,726,899!
2729
  resultRsp.isCalc = sStreamReaderInfo->isVtableStream ? true : false;
1,728,251!
2730
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
1,728,251!
2731
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
1,728,251✔
2732

2733
  if (!sStreamReaderInfo->isVtableStream){
898,405!
2734
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerBlock, true, &pBlock1));
656,364!
2735
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcBlock, false, &pBlock2));
656,364!
2736
  
2737
    blockDataTransform(pBlock2, pBlock1);
656,364✔
2738
    resultRsp.dataBlock = pBlock2;
656,364✔
2739
  }
2740

2741
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
898,405✔
2742
  buf = rpcMallocCont(size);
898,405✔
2743
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
898,405✔
2744
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
895,729✔
2745
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
898,405✔
2746

2747
end:
1,728,251✔
2748
  if (resultRsp.totalRows == 0) {
1,728,251✔
2749
    buf = rpcMallocCont(sizeof(int64_t));
829,846✔
2750
    *(int64_t *)buf = resultRsp.ver;
829,846✔
2751
    size = sizeof(int64_t);
829,846✔
2752
    code = TSDB_CODE_STREAM_NO_DATA;
829,846✔
2753
  }
2754
  SRpcMsg rsp = {
1,728,251✔
2755
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2756
  tmsgSendRsp(&rsp);
1,726,914✔
2757
  if (code == TSDB_CODE_STREAM_NO_DATA){
1,728,251✔
2758
    code = 0;
829,846✔
2759
  }
2760

2761
  blockDataDestroy(pBlock1);
1,728,251✔
2762
  blockDataDestroy(pBlock2);
1,728,251✔
2763
  blockDataDestroy(resultRsp.deleteBlock);
1,728,251✔
2764
  blockDataDestroy(resultRsp.dropBlock);
1,728,251✔
2765
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,726,914!
2766

2767
  return code;
1,726,914✔
2768
}
2769

2770
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
983,155✔
2771
  int32_t code = 0;
983,155✔
2772
  int32_t lino = 0;
983,155✔
2773
  void*   buf = NULL;
983,155✔
2774
  size_t  size = 0;
983,155✔
2775

2776
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
983,155!
2777
  void* pTask = sStreamReaderInfo->pTask;
983,155✔
2778
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
984,154✔
2779

2780
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
984,154✔
2781
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
984,154!
2782
  SStreamGroupInfo pGroupInfo = {0};
984,154✔
2783
  pGroupInfo.gInfo = *gInfo;
984,154✔
2784

2785
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
984,154✔
2786
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
2787
  buf = rpcMallocCont(size);
982,829✔
2788
  STREAM_CHECK_NULL_GOTO(buf, terrno);
984,154!
2789
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
984,154✔
2790
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
2791
end:
984,154✔
2792
  if (code != 0) {
984,154!
2793
    rpcFreeCont(buf);
×
2794
    buf = NULL;
×
2795
    size = 0;
×
2796
  }
2797
  STREAM_PRINT_LOG_END_WITHID(code, lino);
984,154!
2798
  SRpcMsg rsp = {
984,154✔
2799
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2800
  tmsgSendRsp(&rsp);
984,154✔
2801

2802
  return code;
982,818✔
2803
}
2804

2805
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
982,232✔
2806
  int32_t              code = 0;
982,232✔
2807
  int32_t              lino = 0;
982,232✔
2808
  void*                buf = NULL;
982,232✔
2809
  size_t               size = 0;
982,232✔
2810
  SStreamMsgVTableInfo vTableInfo = {0};
982,232✔
2811
  SMetaReader          metaReader = {0};
982,232✔
2812
  SStorageAPI api = {0};
982,232✔
2813
  initStorageAPI(&api);
982,232✔
2814

2815
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
982,232!
2816
  void* pTask = sStreamReaderInfo->pTask;
982,232✔
2817
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
982,232✔
2818

2819
  SArray* cids = req->virTableInfoReq.cids;
982,232✔
2820
  STREAM_CHECK_NULL_GOTO(cids, terrno);
982,232!
2821

2822
  SArray* pTableListArray = qStreamGetTableArrayList(sStreamReaderInfo->tableList);
982,232✔
2823
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
982,232!
2824

2825
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
982,232✔
2826
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
982,232!
2827
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
982,232✔
2828

2829
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
3,182,213✔
2830
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
2,199,981✔
2831
    if (pKeyInfo == NULL) {
2,199,981!
2832
      continue;
×
2833
    }
2834
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
2,199,981✔
2835
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
2,199,981!
2836
    vTable->uid = pKeyInfo->uid;
2,199,981✔
2837
    vTable->gId = pKeyInfo->groupId;
2,199,981✔
2838

2839
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
2,199,981✔
2840
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
2,199,981!
2841
      vTable->cols.nCols = metaReader.me.colRef.nCols;
148,478✔
2842
      vTable->cols.version = metaReader.me.colRef.version;
148,478✔
2843
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
148,478!
2844
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
890,868✔
2845
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
742,390!
2846
      }
2847
    } else {
2848
      vTable->cols.nCols = taosArrayGetSize(cids);
2,051,503✔
2849
      vTable->cols.version = metaReader.me.colRef.version;
2,051,503✔
2850
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
2,051,503!
2851
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
7,649,157✔
2852
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
22,482,693✔
2853
          if (metaReader.me.colRef.pColRef[j].hasRef &&
20,431,190!
2854
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
14,710,678✔
2855
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
3,546,151!
2856
            break;
3,546,151✔
2857
          }
2858
        }
2859
      }
2860
    }
2861
    tDecoderClear(&metaReader.coder);
2,198,712✔
2862
  }
2863
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
980,896✔
2864
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
980,896!
2865

2866
end:
979,577✔
2867
  tDestroySStreamMsgVTableInfo(&vTableInfo);
982,232✔
2868
  api.metaReaderFn.clearReader(&metaReader);
980,914✔
2869
  STREAM_PRINT_LOG_END_WITHID(code, lino);
980,914!
2870
  SRpcMsg rsp = {
982,232✔
2871
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2872
  tmsgSendRsp(&rsp);
982,232✔
2873
  return code;
982,232✔
2874
}
2875

2876
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
253,448✔
2877
  int32_t                   code = 0;
253,448✔
2878
  int32_t                   lino = 0;
253,448✔
2879
  void*                     buf = NULL;
253,448✔
2880
  size_t                    size = 0;
253,448✔
2881
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
253,448✔
2882
  SMetaReader               metaReader = {0};
253,448✔
2883
  int64_t streamId = req->base.streamId;
253,448✔
2884
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
253,448✔
2885

2886
  SStorageAPI api = {0};
253,448✔
2887
  initStorageAPI(&api);
253,448✔
2888

2889
  SArray* cols = req->origTableInfoReq.cols;
253,448✔
2890
  STREAM_CHECK_NULL_GOTO(cols, terrno);
253,448!
2891

2892
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
253,448✔
2893

2894
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
253,448!
2895

2896
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
253,448✔
2897
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
930,522✔
2898
    OTableInfo*    oInfo = taosArrayGet(cols, i);
677,074✔
2899
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
677,074✔
2900
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
677,074!
2901
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
677,074!
2902
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
677,074!
2903
    vTableInfo->uid = metaReader.me.uid;
675,738✔
2904
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
675,738✔
2905

2906
    SSchemaWrapper* sSchemaWrapper = NULL;
677,074✔
2907
    if (metaReader.me.type == TD_CHILD_TABLE) {
677,074✔
2908
      int64_t suid = metaReader.me.ctbEntry.suid;
671,730✔
2909
      vTableInfo->suid = suid;
671,730✔
2910
      tDecoderClear(&metaReader.coder);
671,730✔
2911
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
670,394!
2912
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
671,730✔
2913
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
5,344!
2914
      vTableInfo->suid = 0;
5,344✔
2915
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
5,344✔
2916
    } else {
2917
      stError("invalid table type:%d", metaReader.me.type);
×
2918
    }
2919

2920
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
2,290,654!
2921
      SSchema* s = sSchemaWrapper->pSchema + j;
2,290,654✔
2922
      if (strcmp(s->name, oInfo->refColName) == 0) {
2,290,654!
2923
        vTableInfo->cid = s->colId;
677,074✔
2924
        break;
677,074✔
2925
      }
2926
    }
2927
    if (vTableInfo->cid == 0) {
677,074!
2928
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
2929
              oInfo->refTableName);
2930
    }
2931
    tDecoderClear(&metaReader.coder);
677,074✔
2932
  }
2933

2934
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
253,448!
2935

2936
end:
253,448✔
2937
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
253,448✔
2938
  api.metaReaderFn.clearReader(&metaReader);
253,448✔
2939
  STREAM_PRINT_LOG_END(code, lino);
253,448!
2940
  SRpcMsg rsp = {
253,448✔
2941
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2942
  tmsgSendRsp(&rsp);
253,448✔
2943
  return code;
253,448✔
2944
}
2945

2946
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
1,577,244✔
2947
  int32_t                   code = 0;
1,577,244✔
2948
  int32_t                   lino = 0;
1,577,244✔
2949
  void*                     buf = NULL;
1,577,244✔
2950
  size_t                    size = 0;
1,577,244✔
2951
  SSDataBlock* pBlock = NULL;
1,577,244✔
2952

2953
  SMetaReader               metaReader = {0};
1,577,244✔
2954
  SMetaReader               metaReaderStable = {0};
1,577,244✔
2955
  int64_t streamId = req->base.streamId;
1,577,244✔
2956
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
1,577,244✔
2957

2958
  SStorageAPI api = {0};
1,577,244✔
2959
  initStorageAPI(&api);
1,577,244✔
2960

2961
  SArray* cols = req->virTablePseudoColReq.cids;
1,577,244✔
2962
  STREAM_CHECK_NULL_GOTO(cols, terrno);
1,577,244!
2963

2964
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
1,577,244✔
2965
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
1,577,244!
2966

2967
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
1,577,244!
2968

2969
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
1,577,244!
2970
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
1,577,244✔
2971
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 || *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
9,352!
2972
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
9,352✔
2973
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
9,352!
2974
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
9,352!
2975
    pBlock->info.rows = 1;
9,352✔
2976
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
9,352✔
2977
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
9,352!
2978
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
9,352!
2979
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
1,567,892!
2980
    int64_t suid = metaReader.me.ctbEntry.suid;
1,567,892✔
2981
    api.metaReaderFn.readerReleaseLock(&metaReader);
1,567,892✔
2982
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
1,567,892✔
2983

2984
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
1,567,892!
2985
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
1,567,892✔
2986
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
4,006,402✔
2987
      col_id_t* id = taosArrayGet(cols, i);
2,438,510✔
2988
      STREAM_CHECK_NULL_GOTO(id, terrno);
2,438,510!
2989
      if (*id == -1) {
2,438,510✔
2990
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
1,565,354✔
2991
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
1,565,354!
2992
        continue;
1,565,354✔
2993
      }
2994
      size_t j = 0;
873,156✔
2995
      for (; j < sSchemaWrapper->nCols; j++) {
1,501,311!
2996
        SSchema* s = sSchemaWrapper->pSchema + j;
1,501,311✔
2997
        if (s->colId == *id) {
1,501,311✔
2998
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
873,156✔
2999
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
873,156!
3000
          break;
873,156✔
3001
        }
3002
      }
3003
      if (j == sSchemaWrapper->nCols) {
873,156!
3004
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
3005
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
3006
      }
3007
    }
3008
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
1,567,892!
3009
    pBlock->info.rows = 1;
1,567,892✔
3010
    
3011
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
4,006,402✔
3012
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
2,437,173✔
3013
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
2,438,510!
3014

3015
      if (pDst->info.colId == -1) {
2,438,510✔
3016
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
1,564,085!
3017
        continue;
1,565,354✔
3018
      }
3019
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
873,156!
3020
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
3021
        continue;
×
3022
      }
3023

3024
      STagVal val = {0};
873,156✔
3025
      val.cid = pDst->info.colId;
873,156✔
3026
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
873,156✔
3027

3028
      char* data = NULL;
871,819✔
3029
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
871,819!
3030
        data = tTagValToData((const STagVal*)p, false);
873,156✔
3031
      } else {
3032
        data = (char*)p;
×
3033
      }
3034

3035
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
870,550!
3036
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
3037

3038
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
870,550!
3039
          (data != NULL)) {
3040
        taosMemoryFree(data);
626,886!
3041
      }
3042
    }
3043
  } else {
3044
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
3045
    code = TSDB_CODE_INVALID_PARA;
×
3046
    goto end;
×
3047
  }
3048
  
3049
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
1,575,975✔
3050
  printDataBlock(pBlock, __func__, "", streamId);
1,577,244✔
3051
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
1,577,244!
3052

3053
end:
1,577,176✔
3054
  if(size == 0){
1,575,907!
3055
    code = TSDB_CODE_STREAM_NO_DATA;
×
3056
  }
3057
  api.metaReaderFn.clearReader(&metaReaderStable);
1,575,907✔
3058
  api.metaReaderFn.clearReader(&metaReader);
1,575,907✔
3059
  STREAM_PRINT_LOG_END(code, lino);
1,577,244!
3060
  SRpcMsg rsp = {
1,577,244✔
3061
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3062
  tmsgSendRsp(&rsp);
1,577,244✔
3063
  blockDataDestroy(pBlock);
1,577,244✔
3064
  return code;
1,575,907✔
3065
}
3066

3067
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
24,013,782✔
3068
  int32_t            code = 0;
24,013,782✔
3069
  int32_t            lino = 0;
24,013,782✔
3070
  void*              buf = NULL;
24,013,782✔
3071
  size_t             size = 0;
24,013,782✔
3072
  void*              taskAddr = NULL;
24,012,513✔
3073
  SArray*            pResList = NULL;
24,013,782✔
3074
  bool               hasNext = false;
24,013,782✔
3075

3076
  SResFetchReq req = {0};
24,013,782✔
3077
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
24,013,782!
3078
                              TSDB_CODE_QRY_INVALID_INPUT);
3079
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
24,012,436✔
3080
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
24,013,782!
3081

3082
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
24,013,782!
3083
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
24,013,782✔
3084
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
24,013,782!
3085
  void* pTask = sStreamReaderCalcInfo->pTask;
24,013,782✔
3086
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
24,013,782!
3087
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
3088

3089
  if (req.reset) {
24,013,782✔
3090
    int64_t uid = 0;
23,883,671✔
3091
    if (req.dynTbname) {
23,883,671!
3092
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
117,472✔
3093
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
117,472!
3094
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
117,472✔
3095
        if (pValue != NULL && pValue->isTbname) {
117,472!
3096
          uid = pValue->uid;
117,472✔
3097
          break;
117,472✔
3098
        }
3099
      }
3100
    }
3101
    
3102
    SReadHandle handle = {0};
23,883,671✔
3103
    handle.vnode = pVnode;
23,883,671✔
3104
    handle.uid = uid;
23,883,671✔
3105

3106
    initStorageAPI(&handle.api);
23,883,671✔
3107
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
23,883,671✔
3108
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
22,379,972✔
3109
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
20,719,806✔
3110
      if (node != NULL) {
20,719,806✔
3111
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, false));
1,202,946!
3112
      } else {
3113
        ST_TASK_DLOG("vgId:%d %s no scan time range node", TD_VID(pVnode), __func__);
19,516,860✔
3114
      }
3115

3116
      node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pExtTimeRange;
20,721,158✔
3117
      if (node != NULL) {
20,719,806!
3118
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, true));
×
3119
      } else {
3120
        ST_TASK_DLOG("vgId:%d %s no interp time range node", TD_VID(pVnode), __func__);
20,719,806✔
3121
      }      
3122
    }
3123

3124
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
23,883,671!
3125
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
23,883,671✔
3126

3127
    if (sStreamReaderCalcInfo->pTaskInfo == NULL || !qNeedReset(sStreamReaderCalcInfo->pTaskInfo)) {
23,883,671✔
3128
      qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
954,584✔
3129
      STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
954,584✔
3130
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
3131
                                                    req.taskId));
3132
    } else {
3133
      handle.version = pVnode->state.applied;
22,929,087✔
3134
      STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, &handle));
22,929,087✔
3135
    }
3136

3137
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
23,874,281!
3138
  }
3139

3140
  if (req.pOpParam != NULL) {
24,003,026✔
3141
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
1,051,339✔
3142
  }
3143
  
3144
  pResList = taosArrayInit(4, POINTER_BYTES);
24,003,026✔
3145
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
24,004,301!
3146
  uint64_t ts = 0;
24,004,301✔
3147
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
24,001,579✔
3148

3149
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
51,109,722✔
3150
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
27,112,008✔
3151
    if (pBlock == NULL) continue;
27,112,008!
3152
    printDataBlock(pBlock, __func__, "fetch", ((SStreamTask*)pTask)->streamId);
27,112,008✔
3153
/*    
3154
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
3155
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
3156
      printDataBlock(pBlock, __func__, "fetch filter");
3157
    }
3158
*/    
3159
  }
3160

3161
end:
24,013,782✔
3162
  ST_TASK_DLOG("vgId:%d %s start to build rsp", TD_VID(pVnode), __func__);
24,013,782✔
3163
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
24,013,782!
3164
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
24,013,782✔
3165

3166
  taosArrayDestroy(pResList);
24,013,782✔
3167
  streamReleaseTask(taskAddr);
24,013,782✔
3168

3169
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST){
24,013,782!
NEW
3170
    code = TDB_CODE_SUCCESS;
×
3171
  }
3172
  STREAM_PRINT_LOG_END(code, lino);
24,013,782!
3173
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
24,013,782✔
3174
  tmsgSendRsp(&rsp);
24,013,782✔
3175
  tDestroySResFetchReq(&req);
24,013,782✔
3176
  return code;
24,013,782✔
3177
}
3178

3179
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
112,692,698✔
3180
  int32_t                   code = 0;
112,692,698✔
3181
  int32_t                   lino = 0;
112,692,698✔
3182
  SSTriggerPullRequestUnion req = {0};
112,692,698✔
3183
  void*                     taskAddr = NULL;
112,698,009✔
3184

3185
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
112,691,218✔
3186
  if (!syncIsReadyForRead(pVnode->sync)) {
112,694,067✔
3187
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
171,420✔
3188
    return 0;
171,420✔
3189
  }
3190

3191
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
112,535,977✔
3192
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
24,013,782✔
3193
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
88,519,607!
3194
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
88,523,532✔
3195
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
88,519,590✔
3196
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
88,516,916!
3197
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
88,509,937✔
3198
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
3199
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
88,504,753✔
3200
    if (sStreamReaderInfo != NULL) {  
88,495,711✔
3201
      (void)taosThreadMutexLock(&sStreamReaderInfo->mutex);
88,164,336✔
3202
      if (sStreamReaderInfo->tableList == NULL) {
88,184,058✔
3203
        STREAM_CHECK_RET_GOTO(generateTablistForStreamReader(pVnode, sStreamReaderInfo, false));  
755,132!
3204
        STREAM_CHECK_RET_GOTO(generateTablistForStreamReader(pVnode, sStreamReaderInfo, true));
755,132!
3205
        STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamReaderInfo->pConditions, &sStreamReaderInfo->pFilterInfo, 0, NULL));
755,132!
3206
      }
3207
      (void)taosThreadMutexUnlock(&sStreamReaderInfo->mutex);
88,184,058✔
3208
      sStreamReaderInfo->pVnode = pVnode;
88,184,150✔
3209
    }
3210
    switch (req.base.type) {
88,518,176!
3211
      case STRIGGER_PULL_SET_TABLE:
253,448✔
3212
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo));
253,448!
3213
        break;
253,448✔
3214
      case STRIGGER_PULL_LAST_TS:
739,926✔
3215
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
739,926!
3216
        break;
738,595✔
3217
      case STRIGGER_PULL_FIRST_TS:
628,391✔
3218
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
628,391✔
3219
        break;
619,188✔
3220
      case STRIGGER_PULL_TSDB_META:
1,283,977✔
3221
      case STRIGGER_PULL_TSDB_META_NEXT:
3222
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo));
1,283,977✔
3223
        break;
1,274,774✔
3224
      case STRIGGER_PULL_TSDB_TS_DATA:
46,055✔
3225
        if (sStreamReaderInfo->isVtableStream) {
46,055!
3226
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo));
2,538!
3227
        } else {
3228
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo));
43,517!
3229
        }
3230
        break;
46,055✔
3231
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
391,807✔
3232
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
3233
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
391,807!
3234
        break;
391,807✔
3235
      case STRIGGER_PULL_TSDB_CALC_DATA:
37,898,741✔
3236
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
3237
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
37,898,741✔
3238
        break;
37,492,254✔
3239
      case STRIGGER_PULL_TSDB_DATA:
764,317✔
3240
      case STRIGGER_PULL_TSDB_DATA_NEXT:
3241
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
764,317✔
3242
        break;
761,779✔
3243
      case STRIGGER_PULL_GROUP_COL_VALUE:
984,154✔
3244
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo));
984,154!
3245
        break;
982,818✔
3246
      case STRIGGER_PULL_VTABLE_INFO:
982,232✔
3247
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
982,232!
3248
        break;
982,232✔
3249
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
1,577,244✔
3250
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req));
1,577,244!
3251
        break;
1,577,244✔
3252
      case STRIGGER_PULL_OTABLE_INFO:
253,448✔
3253
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req));
253,448!
3254
        break;
253,448✔
3255
      case STRIGGER_PULL_WAL_META_NEW:
20,778,792✔
3256
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
20,778,792!
3257
        break;
20,776,120✔
3258
      case STRIGGER_PULL_WAL_DATA_NEW:
10,334,157✔
3259
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
10,334,157!
3260
        break;
10,332,821✔
3261
      case STRIGGER_PULL_WAL_META_DATA_NEW:
9,873,236✔
3262
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
9,873,236!
3263
        break;
9,873,238✔
3264
      case STRIGGER_PULL_WAL_CALC_DATA_NEW:
1,728,251✔
3265
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
1,728,251!
3266
        break;
1,728,251✔
3267
      default:
×
3268
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
3269
        STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
3270
        break;
×
3271
    }
3272
  } else {
3273
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
3274
    STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
3275
  }
3276
end:
88,482,173✔
3277

3278
  streamReleaseTask(taskAddr);
88,514,170✔
3279

3280
  tDestroySTriggerPullRequest(&req);
88,526,187✔
3281
  STREAM_PRINT_LOG_END(code, lino);
88,490,085!
3282
  return code;
88,510,143✔
3283
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc