• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4838

08 Nov 2025 04:37AM UTC coverage: 71.256% (+12.3%) from 58.963%
#4838

push

travis-ci

web-flow
test: adjust source list (#33506)

243241 of 341361 relevant lines covered (71.26%)

281946921.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.03
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdbool.h>
17
#include <stdint.h>
18
#include "nodes.h"
19
#include "osMemPool.h"
20
#include "osMemory.h"
21
#include "scalar.h"
22
#include "streamReader.h"
23
#include "taosdef.h"
24
#include "tarray.h"
25
#include "tcommon.h"
26
#include "tdatablock.h"
27
#include "tdb.h"
28
#include "tdef.h"
29
#include "tencode.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "tlist.h"
33
#include "tmsg.h"
34
#include "tsimplehash.h"
35
#include "vnd.h"
36
#include "vnode.h"
37
#include "vnodeInt.h"
38
#include "executor.h"
39

40
#define BUILD_OPTION(options, sStreamReaderInfo, _ver, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
41
                     _gid, _initReader, _mapInfo)                                                                        \
42
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_mapInfo == NULL ? sStreamReaderInfo->suid : 0),              \
43
                                                  .ver = _ver,                                                     \
44
                                                  .order = _order,                                                 \
45
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
46
                                                  .sStreamReaderInfo = sStreamReaderInfo,                     \
47
                                                  .schemas = _schemas,                                             \
48
                                                  .isSchema = _isSchema,                                           \
49
                                                  .scanMode = _scanMode,                                           \
50
                                                  .gid = _gid,                                                     \
51
                                                  .initReader = _initReader,                                       \
52
                                                  .mapInfo = _mapInfo};
53

54
typedef struct WalMetaResult {
55
  uint64_t    id;
56
  int64_t     skey;
57
  int64_t     ekey;
58
} WalMetaResult;
59

60
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
105,742,299✔
61

62
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
63

64
int32_t sortCid(const void *lp, const void *rp) {
1,311,340✔
65
  int16_t* c1 = (int16_t*)lp;
1,311,340✔
66
  int16_t* c2 = (int16_t*)rp;
1,311,340✔
67

68
  if (*c1 < *c2) {
1,311,340✔
69
    return -1;
1,297,020✔
70
  } else if (*c1 > *c2) {
14,320✔
71
    return 1;
14,320✔
72
  }
73

74
  return 0;
×
75
}
76

77
int32_t sortSSchema(const void *lp, const void *rp) {
1,311,340✔
78
  SSchema* c1 = (SSchema*)lp;
1,311,340✔
79
  SSchema* c2 = (SSchema*)rp;
1,311,340✔
80

81
  if (c1->colId < c2->colId) {
1,311,340✔
82
    return -1;
1,297,020✔
83
  } else if (c1->colId > c2->colId) {
14,320✔
84
    return 1;
14,320✔
85
  }
86

87
  return 0;
×
88
}
89

90
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
92,641,150✔
91
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
92,641,150✔
92
  if (pSrc == NULL) {
92,703,838✔
93
    return terrno;
×
94
  }
95

96
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
92,703,838✔
97
  return 0;
92,662,095✔
98
}
99

100
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
121,850,388✔
101
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
121,850,388✔
102
  if (code != TSDB_CODE_SUCCESS) {
121,840,086✔
103
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
104
  }
105

106
  return code;
121,851,027✔
107
}
108

109
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
10,175,633✔
110
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
10,175,633✔
111
}
112

113
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
362,541✔
114
  int32_t code = 0;
362,541✔
115
  int32_t lino = 0;
362,541✔
116
  void*   buf = NULL;
362,541✔
117
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
362,541✔
118
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
361,131✔
119
  buf = rpcMallocCont(len);
361,131✔
120
  STREAM_CHECK_NULL_GOTO(buf, terrno);
359,697✔
121
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
359,697✔
122
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
363,975✔
123
  *data = buf;
363,975✔
124
  *size = len;
363,975✔
125
  buf = NULL;
363,975✔
126
end:
363,975✔
127
  rpcFreeCont(buf);
363,975✔
128
  return code;
362,541✔
129
}
130

131
static bool needRefreshTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int8_t tableType, int64_t suid, int64_t uid, bool isCalc){
10,055,764✔
132
  if (sStreamReaderInfo->isVtableStream) {
10,055,764✔
133
    int64_t id[2] = {suid, uid};
4,128,666✔
134
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id)) == NULL) {
4,133,061✔
135
      return true;
4,104,030✔
136
    }
137
  } else {
138
    if (tableType != TD_CHILD_TABLE) {
5,927,208✔
139
      return false;
1,931,569✔
140
    }
141
    if (sStreamReaderInfo->tableType == TD_SUPER_TABLE && 
3,995,639✔
142
        suid == sStreamReaderInfo->suid && 
2,605,630✔
143
        qStreamGetGroupId(sStreamReaderInfo->tableList, uid) == -1) {
83,620✔
144
      return true;
32,685✔
145
    }
146
  }
147
  return false;
3,989,088✔
148
}
149

150
static bool uidInTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id, bool isCalc){
179,820,355✔
151
  if (sStreamReaderInfo->isVtableStream) {
179,820,355✔
152
    int64_t tmp[2] = {suid, uid};
48,341,174✔
153
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, tmp, sizeof(tmp)) == NULL) {
48,342,586✔
154
      return false;
17,626,868✔
155
    }
156
    *id = uid;
30,702,174✔
157
  } else {
158
    if (sStreamReaderInfo->tableList == NULL) return false;
131,511,701✔
159

160
    if (sStreamReaderInfo->tableType == TD_SUPER_TABLE) {
131,519,044✔
161
      if (suid != sStreamReaderInfo->suid) return false;
101,222,844✔
162
      if (sStreamReaderInfo->pTagCond == NULL) {
40,298,790✔
163
        if (sStreamReaderInfo->partitionCols == NULL){
34,389,361✔
164
          *id = 0;
147,544✔
165
        } else if (sStreamReaderInfo->groupByTbname){
34,244,018✔
166
          *id= uid;
31,684,413✔
167
        } else {
168
          *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
2,559,605✔
169
          if (*id == -1) return false;
2,557,404✔
170
        }
171
      } else {
172
        //*id= uid;
173
        *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
5,937,701✔
174
        if (*id == -1) return false;
5,934,825✔
175
      }
176
    } else {
177
      *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
30,360,077✔
178
      if(*id == -1) *id = uid;
30,352,480✔
179
      return uid == sStreamReaderInfo->uid;
30,358,316✔
180
    }
181
  }
182
  return true;
68,680,196✔
183
}
184

185
static int32_t generateTablistForStreamReader(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, bool isHistory) {
12,025,673✔
186
  int32_t                   code = 0;
12,025,673✔
187
  int32_t                   lino = 0;
12,025,673✔
188
  SNodeList* groupNew = NULL;                                      
12,025,673✔
189
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
12,033,267✔
190

191
  SStorageAPI api = {0};
12,033,964✔
192
  initStorageAPI(&api);
12,033,964✔
193
  code = qStreamCreateTableListForReader(pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType, groupNew,
12,025,009✔
194
                                         true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api, 
195
                                         isHistory ? &sStreamReaderInfo->historyTableList : &sStreamReaderInfo->tableList,
196
                                         isHistory ? NULL : sStreamReaderInfo->groupIdMap);
197
  end:
12,033,932✔
198
  nodesDestroyList(groupNew);
12,036,134✔
199
  STREAM_PRINT_LOG_END(code, lino);
12,034,701✔
200
  return code;
12,034,701✔
201
}
202

203
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
1,936,403✔
204
  int32_t code = 0;
1,936,403✔
205
  int32_t lino = 0;
1,936,403✔
206
  void*   buf = NULL;
1,936,403✔
207
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
1,936,403✔
208
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
1,936,403✔
209
  buf = rpcMallocCont(len);
1,936,403✔
210
  STREAM_CHECK_NULL_GOTO(buf, terrno);
1,936,403✔
211
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
1,936,403✔
212
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
1,936,403✔
213
  *data = buf;
1,936,403✔
214
  *size = len;
1,936,403✔
215
  buf = NULL;
1,936,403✔
216
end:
1,936,403✔
217
  rpcFreeCont(buf);
1,936,403✔
218
  return code;
1,936,403✔
219
}
220

221
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
3,121,631✔
222
  int32_t code = 0;
3,121,631✔
223
  int32_t lino = 0;
3,121,631✔
224
  void*   buf = NULL;
3,121,631✔
225
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
3,121,631✔
226
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
3,120,194✔
227
  buf = rpcMallocCont(len);
3,120,194✔
228
  STREAM_CHECK_NULL_GOTO(buf, terrno);
3,117,995✔
229
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
3,117,995✔
230
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
3,121,631✔
231
  *data = buf;
3,121,631✔
232
  *size = len;
3,121,631✔
233
  buf = NULL;
3,118,764✔
234
end:
3,118,764✔
235
  rpcFreeCont(buf);
3,118,764✔
236
  return code;
3,114,356✔
237
}
238

239

240
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
107,273,628✔
241
  int32_t code = 0;
107,273,628✔
242
  int32_t lino = 0;
107,273,628✔
243
  void*   buf = NULL;
107,273,628✔
244
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
107,273,628✔
245
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
6,605,979✔
246
  buf = rpcMallocCont(dataEncodeSize);
6,602,310✔
247
  STREAM_CHECK_NULL_GOTO(buf, terrno);
6,598,779✔
248
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
6,598,779✔
249
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
6,604,476✔
250
  *data = buf;
6,604,476✔
251
  *size = dataEncodeSize;
6,604,476✔
252
  buf = NULL;
6,599,409✔
253
end:
107,274,193✔
254
  rpcFreeCont(buf);
107,274,193✔
255
  return code;
107,273,585✔
256
}
257

258
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
2,166,882✔
259
  int32_t        pNum = 1;
2,166,882✔
260
  STableKeyInfo* pList = NULL;
2,177,717✔
261
  int32_t        code = 0;
2,177,717✔
262
  int32_t        lino = 0;
2,177,717✔
263
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
2,177,717✔
264
  if (pList == NULL || pNum == 0) {
2,179,919✔
265
    code = TSDB_CODE_INVALID_PARA;
×
266
    goto end;
×
267
  }
268
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
2,179,919✔
269

270
  cleanupQueryTableDataCond(&pTask->cond);
2,179,919✔
271
  uint64_t suid = pTask->options.sStreamReaderInfo->isVtableStream ? pList->groupId : pTask->options.suid;
2,177,085✔
272
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
2,177,075✔
273
                                                      pTask->options.twindows, suid, pTask->options.ver, NULL));
274
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
2,179,919✔
275

276
end:
2,176,307✔
277
  STREAM_PRINT_LOG_END(code, lino);
2,177,072✔
278
  return code;
2,179,919✔
279
}
280

281
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t id, bool isVTable, int64_t uid,
×
282
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
283
  int32_t code = 0;
×
284
  int32_t lino = 0;
×
285
  int32_t index = 0;
×
286
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
×
287
  if (!isVTable) {
×
288
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
×
289
  }
290
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
×
291
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
×
292
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
×
293
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
×
294
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
×
295

296
end:
×
297
  // STREAM_PRINT_LOG_END(code, lino)
298
  return code;
×
299
}
300

301
static int32_t buildWalMetaBlockNew(SSDataBlock* pBlock, int64_t id, int64_t skey, int64_t ekey, int64_t ver) {
22,346,645✔
302
  int32_t code = 0;
22,346,645✔
303
  int32_t lino = 0;
22,346,645✔
304
  int32_t index = 0;
22,346,645✔
305
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
22,346,645✔
306
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
22,344,115✔
307
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
22,340,912✔
308
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
22,338,038✔
309

310
end:
22,331,433✔
311
  return code;
22,331,433✔
312
}
313

314
static int32_t buildDropTableBlock(SSDataBlock* pBlock, int64_t id, int64_t ver) {
3,677✔
315
  int32_t code = 0;
3,677✔
316
  int32_t lino = 0;
3,677✔
317
  int32_t index = 0;
3,677✔
318
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
3,677✔
319
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
3,677✔
320

321
end:
3,677✔
322
  return code;
3,677✔
323
}
324

325
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
×
326
  pTSchema->numOfCols = 1;
×
327
  pTSchema->version = ver;
×
328
  pTSchema->columns[0].colId = colId;
×
329
  pTSchema->columns[0].type = type;
×
330
  pTSchema->columns[0].bytes = bytes;
×
331
}
×
332

333
static int32_t scanDeleteDataNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
377,756✔
334
                              int64_t ver) {
335
  int32_t    code = 0;
377,756✔
336
  int32_t    lino = 0;
377,756✔
337
  SDecoder   decoder = {0};
377,756✔
338
  SDeleteRes req = {0};
377,756✔
339
  void* pTask = sStreamReaderInfo->pTask;
377,756✔
340

341
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
377,756✔
342
  tDecoderInit(&decoder, data, len);
377,756✔
343
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
377,756✔
344
  STREAM_CHECK_CONDITION_GOTO((sStreamReaderInfo->tableType == TSDB_SUPER_TABLE && !sStreamReaderInfo->isVtableStream && req.suid != sStreamReaderInfo->suid), TDB_CODE_SUCCESS);
377,756✔
345
  
346
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
563,285✔
347
    uint64_t* uid = taosArrayGet(req.uidList, i);
323,459✔
348
    STREAM_CHECK_NULL_GOTO(uid, terrno);
323,459✔
349
    uint64_t   id = 0;
323,459✔
350
    ST_TASK_ILOG("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, *uid, req.skey, req.ekey);
323,459✔
351
    STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, req.suid, *uid, &id, false), TDB_CODE_SUCCESS);
323,459✔
352
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->deleteBlock, ((SSDataBlock*)rsp->deleteBlock)->info.rows + 1));
239,826✔
353
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->deleteBlock, id, req.skey, req.ekey, ver));
239,826✔
354
    ((SSDataBlock*)rsp->deleteBlock)->info.rows++;
239,826✔
355
    rsp->totalRows++;
239,826✔
356
  }
357

358
end:
377,756✔
359
  taosArrayDestroy(req.uidList);
377,756✔
360
  tDecoderClear(&decoder);
377,756✔
361
  return code;
377,756✔
362
}
363

364
static int32_t scanDropTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
3,677✔
365
                             int64_t ver) {
366
  int32_t  code = 0;
3,677✔
367
  int32_t  lino = 0;
3,677✔
368
  SDecoder decoder = {0};
3,677✔
369
  void* pTask = sStreamReaderInfo->pTask;
3,677✔
370

371
  SVDropTbBatchReq req = {0};
3,677✔
372
  tDecoderInit(&decoder, data, len);
3,677✔
373
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
3,677✔
374

375
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
7,354✔
376
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
3,677✔
377
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
3,677✔
378
    uint64_t id = 0;
3,677✔
379
    if(!uidInTableList(sStreamReaderInfo, pDropTbReq->suid, pDropTbReq->uid, &id, false)) {
3,677✔
380
      continue;
×
381
    }
382

383
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dropBlock, ((SSDataBlock*)rsp->dropBlock)->info.rows + 1));
3,677✔
384
    STREAM_CHECK_RET_GOTO(buildDropTableBlock(rsp->dropBlock, id, ver));
3,677✔
385
    ((SSDataBlock*)rsp->dropBlock)->info.rows++;
3,677✔
386
    rsp->totalRows++;
3,677✔
387
    ST_TASK_ILOG("stream reader scan drop uid %" PRId64 ", id %" PRIu64, pDropTbReq->uid, id);
3,677✔
388
  }
389

390
end:
3,677✔
391
  tDecoderClear(&decoder);
3,677✔
392
  return code;
3,677✔
393
}
394

395
static int32_t reloadTableList(SStreamTriggerReaderInfo* sStreamReaderInfo){
4,215,949✔
396
  (void)taosThreadMutexLock(&sStreamReaderInfo->mutex);
4,215,949✔
397
  qStreamDestroyTableList(sStreamReaderInfo->tableList);
4,215,949✔
398
  sStreamReaderInfo->tableList = NULL;
4,212,345✔
399
  int32_t code = generateTablistForStreamReader(sStreamReaderInfo->pVnode, sStreamReaderInfo, false);
4,212,345✔
400
  if (code == 0){
4,213,747✔
401
    qStreamDestroyTableList(sStreamReaderInfo->historyTableList);
4,213,747✔
402
    sStreamReaderInfo->historyTableList = NULL;
4,215,949✔
403
    code = generateTablistForStreamReader(sStreamReaderInfo->pVnode, sStreamReaderInfo, true);
4,215,949✔
404
  }
405
  (void)taosThreadMutexUnlock(&sStreamReaderInfo->mutex);
4,215,949✔
406
  return code;
4,215,949✔
407
}
408

409
static int32_t scanCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
358,887✔
410
  int32_t  code = 0;
358,887✔
411
  int32_t  lino = 0;
358,887✔
412
  SDecoder decoder = {0};
358,887✔
413
  void* pTask = sStreamReaderInfo->pTask;
358,887✔
414

415
  SVCreateTbBatchReq req = {0};
358,887✔
416
  tDecoderInit(&decoder, data, len);
358,887✔
417
  
418
  STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbBatchReq(&decoder, &req));
358,887✔
419

420
  bool found = false;
358,887✔
421
  SVCreateTbReq* pCreateReq = NULL;
358,887✔
422
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
543,683✔
423
    pCreateReq = req.pReqs + iReq;
358,887✔
424
    if (!needRefreshTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
358,887✔
425
      ST_TASK_ILOG("stream reader scan create table jump, %s", pCreateReq->name);
184,796✔
426
      continue;
184,796✔
427
    }
428
    ST_TASK_ILOG("stream reader scan create table %s", pCreateReq->name);
174,091✔
429

430
    found = true;
174,091✔
431
    break;
174,091✔
432
  }
433
  STREAM_CHECK_CONDITION_GOTO(!found, TDB_CODE_SUCCESS);
358,887✔
434

435
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
174,091✔
436
end:
358,887✔
437
  tDeleteSVCreateTbBatchReq(&req);
358,887✔
438
  tDecoderClear(&decoder);
358,887✔
439
  return code;
358,887✔
440
}
441

442
static int32_t processAutoCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SVCreateTbReq* pCreateReq) {
9,694,761✔
443
  int32_t  code = 0;
9,694,761✔
444
  int32_t  lino = 0;
9,694,761✔
445
  void*    pTask = sStreamReaderInfo->pTask;
9,694,761✔
446
  if (!needRefreshTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
9,702,787✔
447
    ST_TASK_DLOG("stream reader scan auto create table jump, %s", pCreateReq->name);
5,729,421✔
448
    goto end;
5,737,295✔
449
  }
450
  ST_TASK_ILOG("stream reader scan auto create table %s", pCreateReq->name);
3,965,492✔
451

452
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
3,965,492✔
453
end:
3,965,492✔
454
  return code;
9,702,787✔
455
}
456

457
static int32_t scanAlterTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
447,869✔
458
  int32_t  code = 0;
447,869✔
459
  int32_t  lino = 0;
447,869✔
460
  SDecoder decoder = {0};
447,869✔
461
  void* pTask = sStreamReaderInfo->pTask;
447,869✔
462

463
  SVAlterTbReq req = {0};
447,869✔
464
  tDecoderInit(&decoder, data, len);
447,869✔
465
  
466
  STREAM_CHECK_RET_GOTO(tDecodeSVAlterTbReq(&decoder, &req));
447,869✔
467
  STREAM_CHECK_CONDITION_GOTO(req.action != TSDB_ALTER_TABLE_UPDATE_TAG_VAL && req.action != TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL, TDB_CODE_SUCCESS);
447,869✔
468

469
  ETableType tbType = 0;
121,496✔
470
  uint64_t suid = 0;
121,496✔
471
  STREAM_CHECK_RET_GOTO(metaGetTableTypeSuidByName(sStreamReaderInfo->pVnode, req.tbName, &tbType, &suid));
121,496✔
472
  STREAM_CHECK_CONDITION_GOTO(tbType != TSDB_CHILD_TABLE, TDB_CODE_SUCCESS);
119,292✔
473
  STREAM_CHECK_CONDITION_GOTO(suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
119,292✔
474

475
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
76,366✔
476
  ST_TASK_ILOG("stream reader scan alter table %s", req.tbName);
76,366✔
477

478
end:
447,869✔
479
  taosArrayDestroy(req.pMultiTag);
447,869✔
480
  tDecoderClear(&decoder);
447,869✔
481
  return code;
447,869✔
482
}
483

484
// static int32_t scanAlterSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
485
//   int32_t  code = 0;
486
//   int32_t  lino = 0;
487
//   SDecoder decoder = {0};
488
//   SMAlterStbReq reqAlter = {0};
489
//   SVCreateStbReq req = {0};
490
//   tDecoderInit(&decoder, data, len);
491
//   void* pTask = sStreamReaderInfo->pTask;
492
  
493
//   STREAM_CHECK_RET_GOTO(tDecodeSVCreateStbReq(&decoder, &req));
494
//   STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
495
//   if (req.alterOriData != 0) {
496
//     STREAM_CHECK_RET_GOTO(tDeserializeSMAlterStbReq(req.alterOriData, req.alterOriDataLen, &reqAlter));
497
//     STREAM_CHECK_CONDITION_GOTO(reqAlter.alterType != TSDB_ALTER_TABLE_DROP_TAG && reqAlter.alterType != TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TDB_CODE_SUCCESS);
498
//   }
499
  
500
//   STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
501

502
//   ST_TASK_ILOG("stream reader scan alter suid %" PRId64, req.suid);
503
// end:
504
//   tFreeSMAltertbReq(&reqAlter);
505
//   tDecoderClear(&decoder);
506
//   return code;
507
// }
508

509
static int32_t scanDropSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
×
510
  int32_t  code = 0;
×
511
  int32_t  lino = 0;
×
512
  SDecoder decoder = {0};
×
513
  void* pTask = sStreamReaderInfo->pTask;
×
514

515
  SVDropStbReq req = {0};
×
516
  tDecoderInit(&decoder, data, len);
×
517
  STREAM_CHECK_RET_GOTO(tDecodeSVDropStbReq(&decoder, &req));
×
518
  STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
×
519
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
×
520

521
  ST_TASK_ILOG("stream reader scan drop suid %" PRId64, req.suid);
×
522
end:
×
523
  tDecoderClear(&decoder);
×
524
  return code;
×
525
}
526

527
static int32_t scanSubmitTbDataForMeta(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* gidHash) {
78,734,438✔
528
  int32_t code = 0;
78,734,438✔
529
  int32_t lino = 0;
78,734,438✔
530
  WalMetaResult walMeta = {0};
78,734,438✔
531
  SSubmitTbData submitTbData = {0};
78,744,343✔
532
  
533
  if (tStartDecode(pCoder) < 0) {
78,753,209✔
534
    code = TSDB_CODE_INVALID_MSG;
×
535
    TSDB_CHECK_CODE(code, lino, end);
×
536
  }
537

538
  uint8_t       version = 0;
78,773,221✔
539
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
78,771,064✔
540
    code = TSDB_CODE_INVALID_MSG;
×
541
    TSDB_CHECK_CODE(code, lino, end);
×
542
  }
543
  version = (submitTbData.flags >> 8) & 0xff;
78,771,064✔
544
  submitTbData.flags = submitTbData.flags & 0xff;
78,771,064✔
545

546
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
547
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
78,771,064✔
548
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
5,171,838✔
549
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
5,167,560✔
550
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
5,167,560✔
551
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq));
5,168,984✔
552
  }
553

554
  // submit data
555
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
78,753,064✔
556
    code = TSDB_CODE_INVALID_MSG;
×
557
    TSDB_CHECK_CODE(code, lino, end);
×
558
  }
559
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
78,748,000✔
560
    code = TSDB_CODE_INVALID_MSG;
×
561
    TSDB_CHECK_CODE(code, lino, end);
×
562
  }
563

564
  if (!uidInTableList(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &walMeta.id, false)){
78,748,000✔
565
    goto end;
69,211,206✔
566
  }
567
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
9,541,934✔
568
    code = TSDB_CODE_INVALID_MSG;
×
569
    TSDB_CHECK_CODE(code, lino, end);
×
570
  }
571

572
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
9,541,934✔
573
    uint64_t nColData = 0;
×
574
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
575
      code = TSDB_CODE_INVALID_MSG;
×
576
      TSDB_CHECK_CODE(code, lino, end);
×
577
    }
578

579
    SColData colData = {0};
×
580
    code = tDecodeColData(version, pCoder, &colData, false);
×
581
    if (code) {
×
582
      code = TSDB_CODE_INVALID_MSG;
×
583
      TSDB_CHECK_CODE(code, lino, end);
×
584
    }
585

586
    if (colData.flag != HAS_VALUE) {
×
587
      code = TSDB_CODE_INVALID_MSG;
×
588
      TSDB_CHECK_CODE(code, lino, end);
×
589
    }
590
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
591
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
592

593
    for (uint64_t i = 1; i < nColData; i++) {
×
594
      code = tDecodeColData(version, pCoder, &colData, true);
×
595
      if (code) {
×
596
        code = TSDB_CODE_INVALID_MSG;
×
597
        TSDB_CHECK_CODE(code, lino, end);
×
598
      }
599
    }
600
  } else {
601
    uint64_t nRow = 0;
9,541,934✔
602
    if (tDecodeU64v(pCoder, &nRow) < 0) {
9,548,396✔
603
      code = TSDB_CODE_INVALID_MSG;
×
604
      TSDB_CHECK_CODE(code, lino, end);
×
605
    }
606

607
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
37,407,264✔
608
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
27,861,714✔
609
      pCoder->pos += pRow->len;
27,858,085✔
610
      if (iRow == 0){
27,856,673✔
611
#ifndef NO_UNALIGNED_ACCESS
612
        walMeta.skey = pRow->ts;
9,544,789✔
613
#else
614
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
615
#endif
616
      }
617
      if (iRow == nRow - 1) {
27,856,673✔
618
#ifndef NO_UNALIGNED_ACCESS
619
        walMeta.ekey = pRow->ts;
9,544,789✔
620
#else
621
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
622
#endif
623
      }
624
    }
625
  }
626

627
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
9,546,216✔
628
  if (data != NULL) {
9,542,607✔
629
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
14,588✔
630
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
14,588✔
631
  } else {
632
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
9,528,019✔
633
  }
634

635
end:
78,718,526✔
636
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
78,740,796✔
637
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
78,729,136✔
638
  tEndDecode(pCoder);
78,729,136✔
639
  return code;
78,741,592✔
640
}
641

642
static int32_t scanSubmitDataForMeta(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len, int64_t ver) {
78,742,539✔
643
  int32_t  code = 0;
78,742,539✔
644
  int32_t  lino = 0;
78,742,539✔
645
  SDecoder decoder = {0};
78,742,539✔
646
  SSHashObj* gidHash = NULL;
78,744,761✔
647
  void* pTask = sStreamReaderInfo->pTask;
78,744,761✔
648

649
  tDecoderInit(&decoder, data, len);
78,738,242✔
650
  if (tStartDecode(&decoder) < 0) {
78,716,659✔
651
    code = TSDB_CODE_INVALID_MSG;
×
652
    TSDB_CHECK_CODE(code, lino, end);
×
653
  }
654

655
  uint64_t nSubmitTbData = 0;
78,737,496✔
656
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
78,746,944✔
657
    code = TSDB_CODE_INVALID_MSG;
×
658
    TSDB_CHECK_CODE(code, lino, end);
×
659
  }
660

661
  gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
78,746,944✔
662
  STREAM_CHECK_NULL_GOTO(gidHash, terrno);
78,713,047✔
663

664
  for (int32_t i = 0; i < nSubmitTbData; i++) {
157,461,780✔
665
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataForMeta(&decoder, sStreamReaderInfo, gidHash));
78,733,468✔
666
  }
667
  tEndDecode(&decoder);
78,728,312✔
668

669
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
78,723,384✔
670
  int32_t iter = 0;
78,682,945✔
671
  void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
78,682,292✔
672
  while (px != NULL) {
88,232,351✔
673
    WalMetaResult* pMeta = (WalMetaResult*)px;
9,515,799✔
674
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
9,515,799✔
675
    ((SSDataBlock*)rsp->metaBlock)->info.rows++;
9,527,999✔
676
    rsp->totalRows++;
9,530,201✔
677
    ST_TASK_DLOG("stream reader scan submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
9,530,201✔
678
          ", ver:%"PRId64, pMeta->skey, pMeta->ekey, pMeta->id, ver);
679
    px = tSimpleHashIterate(gidHash, px, &iter);
9,530,201✔
680
  }
681
end:
78,716,552✔
682
  tDecoderClear(&decoder);
78,730,238✔
683
  tSimpleHashCleanup( gidHash);
78,742,181✔
684
  return code;
78,737,274✔
685
}
686

687
static int32_t createBlockForTsdbMeta(SSDataBlock** pBlock, bool isVTable) {
2,732,415✔
688
  int32_t code = 0;
2,732,415✔
689
  int32_t lino = 0;
2,732,415✔
690
  SArray* schemas = taosArrayInit(8, sizeof(SSchema));
2,732,415✔
691
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
2,732,415✔
692

693
  int32_t index = 1;
2,732,415✔
694
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
2,732,415✔
695
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
2,732,415✔
696
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
2,730,213✔
697
  if (!isVTable) {
2,730,213✔
698
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
578,877✔
699
  }
700
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
2,730,213✔
701

702
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
2,730,213✔
703

704
end:
2,732,415✔
705
  taosArrayDestroy(schemas);
2,732,415✔
706
  return code;
2,732,415✔
707
}
708

709
static int32_t createBlockForWalMetaNew(SSDataBlock** pBlock) {
1,926,862✔
710
  int32_t code = 0;
1,926,862✔
711
  int32_t lino = 0;
1,926,862✔
712
  SArray* schemas = NULL;
1,926,862✔
713

714
  schemas = taosArrayInit(8, sizeof(SSchema));
1,926,862✔
715
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,932,638✔
716

717
  int32_t index = 0;
1,932,638✔
718
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
1,932,638✔
719
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
1,932,638✔
720
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
1,932,638✔
721
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
1,932,638✔
722

723
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
1,932,638✔
724

725
end:
1,932,638✔
726
  taosArrayDestroy(schemas);
1,932,638✔
727
  return code;
1,931,474✔
728
}
729

730
static int32_t createBlockForDropTable(SSDataBlock** pBlock) {
3,677✔
731
  int32_t code = 0;
3,677✔
732
  int32_t lino = 0;
3,677✔
733
  SArray* schemas = NULL;
3,677✔
734

735
  schemas = taosArrayInit(8, sizeof(SSchema));
3,677✔
736
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
3,677✔
737

738
  int32_t index = 0;
3,677✔
739
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
3,677✔
740
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
3,677✔
741

742
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
3,677✔
743

744
end:
3,677✔
745
  taosArrayDestroy(schemas);
3,677✔
746
  return code;
3,677✔
747
}
748

749
static int32_t processMeta(int16_t msgType, SStreamTriggerReaderInfo* sStreamReaderInfo, void *data, int32_t len, SSTriggerWalNewRsp* rsp, int32_t ver) {
2,837,670✔
750
  int32_t code = 0;
2,837,670✔
751
  int32_t lino = 0;
2,837,670✔
752
  SDecoder dcoder = {0};
2,837,670✔
753
  tDecoderInit(&dcoder, data, len);
2,837,670✔
754
  if (msgType == TDMT_VND_DELETE && sStreamReaderInfo->deleteReCalc != 0) {
2,835,476✔
755
    if (rsp->deleteBlock == NULL) {
377,756✔
756
      STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&rsp->deleteBlock));
130,520✔
757
    }
758
      
759
    STREAM_CHECK_RET_GOTO(scanDeleteDataNew(sStreamReaderInfo, rsp, data, len, ver));
377,756✔
760
  } else if (msgType == TDMT_VND_DROP_TABLE && sStreamReaderInfo->deleteOutTbl != 0) {
2,457,720✔
761
    if (rsp->dropBlock == NULL) {
3,677✔
762
      STREAM_CHECK_RET_GOTO(createBlockForDropTable((SSDataBlock**)&rsp->dropBlock));
3,677✔
763
    }
764
    STREAM_CHECK_RET_GOTO(scanDropTableNew(sStreamReaderInfo, rsp, data, len, ver));
3,677✔
765
  } else if (msgType == TDMT_VND_DROP_STB) {
2,456,237✔
766
    STREAM_CHECK_RET_GOTO(scanDropSTableNew(sStreamReaderInfo, data, len));
×
767
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
2,456,237✔
768
    STREAM_CHECK_RET_GOTO(scanCreateTableNew(sStreamReaderInfo, data, len));
358,887✔
769
  } else if (msgType == TDMT_VND_ALTER_STB) {
2,097,350✔
770
    // STREAM_CHECK_RET_GOTO(scanAlterSTableNew(sStreamReaderInfo, data, len));
771
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
1,504,770✔
772
    STREAM_CHECK_RET_GOTO(scanAlterTableNew(sStreamReaderInfo, data, len));
447,869✔
773
  }
774

775
  end:
2,837,670✔
776
  tDecoderClear(&dcoder);
2,837,670✔
777
  return code;
2,837,670✔
778
}
779
static int32_t processWalVerMetaNew(SVnode* pVnode, SSTriggerWalNewRsp* rsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
47,879,100✔
780
                       int64_t ctime) {
781
  int32_t code = 0;
47,879,100✔
782
  int32_t lino = 0;
47,879,100✔
783
  void* pTask = sStreamReaderInfo->pTask;
47,879,100✔
784

785
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
47,875,469✔
786
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
47,853,914✔
787
  code = walReaderSeekVer(pWalReader, rsp->ver);
47,853,914✔
788
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
47,831,571✔
789
    if (rsp->ver < walGetFirstVer(pWalReader->pWal)) {
44,046,005✔
790
      rsp->ver = walGetFirstVer(pWalReader->pWal);
×
791
    }
792
    ST_TASK_DLOG("vgId:%d %s scan wal error:%s", TD_VID(pVnode), __func__, tstrerror(code));
44,088,471✔
793
    code = TSDB_CODE_SUCCESS;
44,087,839✔
794
    goto end;
44,087,839✔
795
  }
796
  STREAM_CHECK_RET_GOTO(code);
3,785,566✔
797

798
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, STREAM_RETURN_ROWS_NUM));
3,785,566✔
799
  while (1) {
80,223,058✔
800
    code = walNextValidMsg(pWalReader, true);
83,997,720✔
801
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){\
83,962,660✔
802
      ST_TASK_DLOG("vgId:%d %s scan wal error:%s", TD_VID(pVnode), __func__, tstrerror(code));
3,788,315✔
803
      code = TSDB_CODE_SUCCESS;
3,789,170✔
804
      goto end;
3,789,170✔
805
    }
806
    STREAM_CHECK_RET_GOTO(code);
80,174,345✔
807
    rsp->ver = pWalReader->curVersion;
80,174,345✔
808
    SWalCont* wCont = &pWalReader->pHead->head;
80,198,091✔
809
    rsp->verTime = wCont->ingestTs;
80,198,991✔
810
    if (wCont->ingestTs / 1000 > ctime) break;
80,192,627✔
811
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
80,198,297✔
812
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
80,183,865✔
813
    int64_t ver = wCont->version;
80,201,967✔
814

815
    ST_TASK_DLOG("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
80,203,425✔
816
      TD_VID(pVnode), ver, wCont->msgType, sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
817
    if (wCont->msgType == TDMT_VND_SUBMIT) {
80,223,018✔
818
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
78,743,122✔
819
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
78,749,704✔
820
      STREAM_CHECK_RET_GOTO(scanSubmitDataForMeta(sStreamReaderInfo, rsp, data, len, ver));
78,749,696✔
821
    } else {
822
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, rsp, ver));
1,474,691✔
823
    }
824

825
    if (rsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
80,222,264✔
826
      break;
×
827
    }
828
  }
829

830
end:
47,877,009✔
831
  walCloseReader(pWalReader);
47,877,009✔
832
  return code;
47,849,357✔
833
}
834

835
static int32_t processTag(SVnode* pVnode, SStreamTriggerReaderInfo* info, bool isCalc, SStorageAPI* api, 
24,730,856✔
836
  uint64_t uid, SSDataBlock* pBlock, uint32_t currentRow, uint32_t numOfRows, uint32_t numOfBlocks) {
837
  int32_t     code = 0;
24,730,856✔
838
  int32_t     lino = 0;
24,730,856✔
839
  SMetaReader mr = {0};
24,730,856✔
840
  SArray* tagCache = NULL;
24,728,927✔
841

842
  SHashObj* metaCache = isCalc ? info->pTableMetaCacheCalc : info->pTableMetaCacheTrigger;
24,736,664✔
843
  SExprInfo*   pExprInfo = isCalc ? info->pExprInfoCalcTag : info->pExprInfoTriggerTag; 
24,726,056✔
844
  int32_t      numOfExpr = isCalc ? info->numOfExprCalcTag : info->numOfExprTriggerTag;
24,729,322✔
845
  if (numOfExpr == 0) {
24,734,497✔
846
    return TSDB_CODE_SUCCESS;
×
847
  }
848

849
  void* uidData = taosHashGet(metaCache, &uid, LONG_BYTES);
24,734,497✔
850
  if (uidData == NULL) {
24,737,603✔
851
    api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
1,148,608✔
852
    code = api->metaReaderFn.getEntryGetUidCache(&mr, uid);
1,152,740✔
853
    api->metaReaderFn.readerReleaseLock(&mr);
1,151,318✔
854
    STREAM_CHECK_RET_GOTO(code);
1,154,173✔
855

856
    tagCache = taosArrayInit(numOfExpr, POINTER_BYTES);
1,154,173✔
857
    STREAM_CHECK_NULL_GOTO(tagCache, terrno);
1,151,971✔
858
    if(taosHashPut(metaCache, &uid, LONG_BYTES, &tagCache, POINTER_BYTES) != 0) {
1,151,971✔
859
      taosArrayDestroyP(tagCache, taosMemFree);
×
860
      code = terrno;
×
861
      goto end;
×
862
    }
863
  } else {
864
    tagCache = *(SArray**)uidData;
23,588,995✔
865
    STREAM_CHECK_CONDITION_GOTO(taosArrayGetSize(tagCache) != numOfExpr, TSDB_CODE_INVALID_PARA);
23,577,987✔
866
  }
867
  
868
  for (int32_t j = 0; j < numOfExpr; ++j) {
66,876,017✔
869
    const SExprInfo* pExpr1 = &pExprInfo[j];
42,128,243✔
870
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
42,128,244✔
871

872
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
42,140,032✔
873
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
42,138,698✔
874
    int32_t functionId = pExpr1->pExpr->_function.functionId;
42,138,698✔
875

876
    // this is to handle the tbname
877
    if (fmIsScanPseudoColumnFunc(functionId)) {
42,141,655✔
878
      int32_t fType = pExpr1->pExpr->_function.functionType;
24,735,126✔
879
      if (fType == FUNCTION_TYPE_TBNAME) {
24,737,327✔
880
        char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
24,737,338✔
881
        if (uidData == NULL) {
24,739,533✔
882
          STR_TO_VARSTR(buf, mr.me.name)
1,152,740✔
883
          char* tbname = taosStrdup(mr.me.name);
1,151,983✔
884
          STREAM_CHECK_NULL_GOTO(tbname, terrno);
1,154,173✔
885
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &tbname), terrno);
2,308,346✔
886
        } else {
887
          char* tbname = taosArrayGetP(tagCache, j);
23,586,793✔
888
          STR_TO_VARSTR(buf, tbname)
23,573,588✔
889
        }
890
        code = colDataSetNItems(pColInfoData, currentRow, buf, numOfRows, numOfBlocks, false);
24,732,912✔
891
        // stInfo("set pseudo column tbname:%s currentRow:%d, numOfRows:%d, dstSlotId:%d, totalRows:%"PRId64" for uid:%" PRIu64 ", %p,%p", buf + VARSTR_HEADER_SIZE, 
892
        //   currentRow, numOfRows, dstSlotId, pBlock->info.rows, uid, pColInfoData, pColInfoData->pData);
893
        pColInfoData->info.colId = -1;
24,731,496✔
894
      }
895
    } else {  // these are tags
896
      char* data = NULL;
17,396,119✔
897
      const char* p = NULL;
17,390,280✔
898
      STagVal tagVal = {0};
17,390,280✔
899
      if (uidData == NULL) {
17,390,949✔
900
        tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
910,773✔
901
        p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
912,206✔
902

903
        if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
912,206✔
904
          data = tTagValToData((const STagVal*)p, false);
912,206✔
905
        } else {
906
          data = (char*)p;
×
907
        }
908

909
        if (data == NULL) {
912,206✔
910
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &data), terrno);
×
911
        } else {
912
          int32_t len = pColInfoData->info.bytes;
912,206✔
913
          if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
910,773✔
914
            len = calcStrBytesByType(pColInfoData->info.type, (char*)data);
258,292✔
915
          }
916
          char* pData = taosMemoryCalloc(1, len);
912,206✔
917
          STREAM_CHECK_NULL_GOTO(pData, terrno);
912,206✔
918
          (void)memcpy(pData, data, len);
912,206✔
919
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &pData), terrno);
1,824,412✔
920
        }
921
      } else {
922
        data = taosArrayGetP(tagCache, j);
16,480,176✔
923
      }
924

925
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
17,398,951✔
926
      if (isNullVal) {
17,407,758✔
927
        colDataSetNNULL(pColInfoData, currentRow, numOfRows);
×
928
      } else {
929
        for (uint32_t i = 0; i < numOfRows && !IS_VAR_DATA_TYPE(pColInfoData->info.type); i++){
37,263,829✔
930
          colDataClearNull_f(pColInfoData->nullbitmap, currentRow + i);
19,853,871✔
931
        }
932
        code = colDataSetNItems(pColInfoData, currentRow, data, numOfRows, numOfBlocks, false);
17,404,120✔
933
        if (uidData == NULL && pColInfoData->info.type != TSDB_DATA_TYPE_JSON && IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
17,405,584✔
934
          taosMemoryFree(data);
258,292✔
935
        }
936
        STREAM_CHECK_RET_GOTO(code);
17,401,951✔
937
      }
938
    }
939
  }
940

941
end:
24,760,105✔
942
  api->metaReaderFn.clearReader(&mr);
24,741,730✔
943
  return code;
24,732,254✔
944
}
945

946
int32_t getRowRange(SColData* pCol, STimeWindow* window, int32_t* rowStart, int32_t* rowEnd, int32_t* nRows) {
×
947
  int32_t code = 0;
×
948
  int32_t lino = 0;
×
949
  *nRows = 0;
×
950
  *rowStart = 0;
×
951
  *rowEnd = pCol->nVal;
×
952
  if (window != NULL) {
×
953
    SColVal colVal = {0};
×
954
    *rowStart = -1;
×
955
    *rowEnd = -1;
×
956
    for (int32_t k = 0; k < pCol->nVal; k++) {
×
957
      STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
958
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
959
      if (ts >= window->skey && *rowStart == -1) {
×
960
        *rowStart = k;
×
961
      }
962
      if (ts > window->ekey && *rowEnd == -1) {
×
963
        *rowEnd = k;
×
964
      }
965
    }
966
    STREAM_CHECK_CONDITION_GOTO(*rowStart == -1 || *rowStart == *rowEnd, TDB_CODE_SUCCESS);
×
967

968
    if (*rowStart != -1 && *rowEnd == -1) {
×
969
      *rowEnd = pCol->nVal;
×
970
    }
971
  }
972
  *nRows = *rowEnd - *rowStart;
×
973

974
end:
×
975
  return code;
×
976
}
977

978
static int32_t setColData(int64_t rows, int32_t rowStart, int32_t rowEnd, SColData* colData, SColumnInfoData* pColData) {
×
979
  int32_t code = 0;
×
980
  int32_t lino = 0;
×
981
  for (int32_t k = rowStart; k < rowEnd; k++) {
×
982
    SColVal colVal = {0};
×
983
    STREAM_CHECK_RET_GOTO(tColDataGetValue(colData, k, &colVal));
×
984
    STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, rows + k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
985
                                        !COL_VAL_IS_VALUE(&colVal)));
986
  }
987
  end:
×
988
  return code;
×
989
}
990

991
static int32_t scanSubmitTbData(SVnode* pVnode, SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, 
45,447,684✔
992
  STSchema** schemas, SSHashObj* ranges, SSHashObj* gidHash, SSTriggerWalNewRsp* rsp, int64_t ver) {
993
  int32_t code = 0;
45,447,684✔
994
  int32_t lino = 0;
45,447,684✔
995
  uint64_t id = 0;
45,447,684✔
996
  WalMetaResult walMeta = {0};
45,445,489✔
997
  void* pTask = sStreamReaderInfo->pTask;
45,452,759✔
998
  SSDataBlock * pBlock = (SSDataBlock*)rsp->dataBlock;
45,462,339✔
999

1000
  if (tStartDecode(pCoder) < 0) {
45,456,370✔
1001
    code = TSDB_CODE_INVALID_MSG;
×
1002
    TSDB_CHECK_CODE(code, lino, end);
×
1003
  }
1004

1005
  SSubmitTbData submitTbData = {0};
45,470,225✔
1006
  uint8_t       version = 0;
45,470,225✔
1007
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
45,451,435✔
1008
    code = TSDB_CODE_INVALID_MSG;
×
1009
    TSDB_CHECK_CODE(code, lino, end);
×
1010
  }
1011
  version = (submitTbData.flags >> 8) & 0xff;
45,451,435✔
1012
  submitTbData.flags = submitTbData.flags & 0xff;
45,451,435✔
1013
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1014
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
45,451,435✔
1015
    if (tStartDecode(pCoder) < 0) {
271,514✔
1016
      code = TSDB_CODE_INVALID_MSG;
×
1017
      TSDB_CHECK_CODE(code, lino, end);
×
1018
    }
1019
    tEndDecode(pCoder);
271,514✔
1020
  }
1021

1022
  // submit data
1023
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
45,458,565✔
1024
    code = TSDB_CODE_INVALID_MSG;
×
1025
    TSDB_CHECK_CODE(code, lino, end);
×
1026
  }
1027
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
45,455,692✔
1028
    code = TSDB_CODE_INVALID_MSG;
×
1029
    TSDB_CHECK_CODE(code, lino, end);
×
1030
  }
1031

1032
  STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &id, rsp->isCalc), TDB_CODE_SUCCESS);
45,455,692✔
1033

1034
  walMeta.id = id;
35,709,126✔
1035
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
35,709,126✔
1036

1037
  if (ranges != NULL){
35,722,246✔
1038
    void* timerange = tSimpleHashGet(ranges, &id, sizeof(id));
23,141,108✔
1039
    if (timerange == NULL) goto end;;
23,143,932✔
1040
    int64_t* pRange = (int64_t*)timerange;
23,143,932✔
1041
    window.skey = pRange[0];
23,143,932✔
1042
    window.ekey = pRange[1];
23,143,932✔
1043
  }
1044
  
1045
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
35,720,784✔
1046
    code = TSDB_CODE_INVALID_MSG;
×
1047
    TSDB_CHECK_CODE(code, lino, end);
×
1048
  }
1049

1050
  STSchema*    schema = NULL;
35,720,784✔
1051
  if (sStreamReaderInfo->isVtableStream) {
35,720,784✔
1052
    if (*schemas == NULL) {
11,670,284✔
1053
      *schemas = metaGetTbTSchema(pVnode->pMeta, submitTbData.suid != 0 ? submitTbData.suid : submitTbData.uid, submitTbData.sver, 1);
11,671,696✔
1054
      STREAM_CHECK_NULL_GOTO(*schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
11,672,375✔
1055
    }
1056
    schema = *schemas;
11,668,139✔
1057
  } else {
1058
    if (sStreamReaderInfo->triggerTableSchema == NULL || sStreamReaderInfo->triggerTableSchema->version != submitTbData.sver) {
24,048,299✔
1059
      taosMemoryFree(sStreamReaderInfo->triggerTableSchema);
737,271✔
1060
      sStreamReaderInfo->triggerTableSchema = metaGetTbTSchema(pVnode->pMeta, submitTbData.suid != 0 ? submitTbData.suid : submitTbData.uid, submitTbData.sver, 1);
737,271✔
1061
      STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerTableSchema, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
737,271✔
1062
    }
1063
    schema = sStreamReaderInfo->triggerTableSchema;
24,046,901✔
1064
  }
1065
  SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(sStreamReaderInfo->indexHash, &submitTbData.uid, LONG_BYTES);
35,717,451✔
1066
  STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
35,720,767✔
1067
  int32_t blockStart = pSlice->currentRowIdx;
35,720,767✔
1068

1069
  int32_t numOfRows = 0;
35,725,840✔
1070
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
35,709,305✔
1071
    uint64_t nColData = 0;
×
1072
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1073
      code = TSDB_CODE_INVALID_MSG;
×
1074
      TSDB_CHECK_CODE(code, lino, end);
×
1075
    }
1076

1077
    SColData colData = {0};
×
1078
    code = tDecodeColData(version, pCoder, &colData, false);
×
1079
    if (code) {
×
1080
      code = TSDB_CODE_INVALID_MSG;
×
1081
      TSDB_CHECK_CODE(code, lino, end);
×
1082
    }
1083

1084
    if (colData.flag != HAS_VALUE) {
×
1085
      code = TSDB_CODE_INVALID_MSG;
×
1086
      TSDB_CHECK_CODE(code, lino, end);
×
1087
    }
1088
    
1089
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
1090
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
1091

1092
    int32_t rowStart = 0;
×
1093
    int32_t rowEnd = 0;
×
1094
    STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, &numOfRows));
×
1095
    STREAM_CHECK_CONDITION_GOTO(numOfRows <= 0, TDB_CODE_SUCCESS);
×
1096

1097
    int32_t pos = pCoder->pos;
×
1098
    for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
1099
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
1100
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
1101
      if (pColData->info.colId <= -1) {
×
1102
        pColData->hasNull = true;
×
1103
        continue;
×
1104
      }
1105
      if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
1106
        STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colData, pColData));
×
1107
        continue;
×
1108
      }
1109

1110
      pCoder->pos = pos;
×
1111

1112
      int16_t colId = 0;
×
1113
      if (sStreamReaderInfo->isVtableStream){
×
1114
        int64_t id[2] = {submitTbData.suid, submitTbData.uid};
×
1115
        void *px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
×
1116
        STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
×
1117
        SSHashObj* uInfo = *(SSHashObj **)px;
×
1118
        STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
×
1119
        int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
×
1120
        if (tmp != NULL) {
×
1121
          colId = *tmp;
×
1122
        } else {
1123
          colId = -1;
×
1124
        }
1125
      } else {
1126
        colId = pColData->info.colId;
×
1127
      }
1128
      
1129
      uint64_t j = 1;
×
1130
      for (; j < nColData; j++) {
×
1131
        int16_t cid = 0;
×
1132
        int32_t posTmp = pCoder->pos;
×
1133
        pCoder->pos += INT_BYTES;
×
1134
        if ((code = tDecodeI16v(pCoder, &cid))) return code;
×
1135
        pCoder->pos = posTmp;
×
1136
        if (cid == colId) {
×
1137
          SColData colDataTmp = {0};
×
1138
          code = tDecodeColData(version, pCoder, &colDataTmp, false);
×
1139
          if (code) {
×
1140
            code = TSDB_CODE_INVALID_MSG;
×
1141
            TSDB_CHECK_CODE(code, lino, end);
×
1142
          }
1143
          STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colDataTmp, pColData));
×
1144
          break;
×
1145
        }
1146
        code = tDecodeColData(version, pCoder, &colData, true);
×
1147
        if (code) {
×
1148
          code = TSDB_CODE_INVALID_MSG;
×
1149
          TSDB_CHECK_CODE(code, lino, end);
×
1150
        }
1151
      }
1152
      if (j == nColData) {
×
1153
        colDataSetNNULL(pColData, blockStart, numOfRows);
×
1154
      }
1155
    }
1156
  } else {
1157
    uint64_t nRow = 0;
35,709,305✔
1158
    if (tDecodeU64v(pCoder, &nRow) < 0) {
35,720,812✔
1159
      code = TSDB_CODE_INVALID_MSG;
×
1160
      TSDB_CHECK_CODE(code, lino, end);
×
1161
    }
1162
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
91,800,180✔
1163
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
56,079,390✔
1164
      pCoder->pos += pRow->len;
56,080,926✔
1165

1166
      if (iRow == 0){
56,076,541✔
1167
#ifndef NO_UNALIGNED_ACCESS
1168
        walMeta.skey = pRow->ts;
35,716,393✔
1169
#else
1170
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
1171
#endif
1172
      }
1173
      if (iRow == nRow - 1) {
56,073,036✔
1174
#ifndef NO_UNALIGNED_ACCESS
1175
        walMeta.ekey = pRow->ts;
35,712,099✔
1176
#else
1177
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
1178
#endif
1179
      }
1180

1181
      if (pRow->ts < window.skey || pRow->ts > window.ekey) {
56,072,300✔
1182
        continue;
2,265,119✔
1183
      }
1184
     
1185
      for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
380,282,969✔
1186
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
326,456,337✔
1187
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
326,465,212✔
1188
        if (pColData->info.colId <= -1) {
326,465,212✔
1189
          pColData->hasNull = true;
102,129,954✔
1190
          continue;
102,134,357✔
1191
        }
1192
        int16_t colId = 0;
224,437,764✔
1193
        if (sStreamReaderInfo->isVtableStream){
224,437,764✔
1194
          int64_t id[2] = {submitTbData.suid, submitTbData.uid};
29,908,572✔
1195
          void* px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
29,908,572✔
1196
          STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
29,918,740✔
1197
          SSHashObj* uInfo = *(SSHashObj**)px;
29,918,740✔
1198
          STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
29,915,894✔
1199
          int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
29,915,894✔
1200
          if (tmp != NULL) {
29,910,236✔
1201
            colId = *tmp;
25,805,508✔
1202
          } else {
1203
            colId = -1;
4,104,728✔
1204
          }
1205
          ST_TASK_TLOG("%s vtable colId:%d, i:%d, uid:%" PRId64, __func__, colId, i, submitTbData.uid);
29,919,542✔
1206
        } else {
1207
          colId = pColData->info.colId;
194,529,950✔
1208
        }
1209
        
1210
        SColVal colVal = {0};
224,432,815✔
1211
        int32_t sourceIdx = 0;
224,418,473✔
1212
        while (1) {
1213
          if (sourceIdx >= schema->numOfCols) {
600,682,547✔
1214
            break;
108,756,055✔
1215
          }
1216
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schema, sourceIdx, &colVal));
491,865,313✔
1217
          if (colVal.cid == colId) {
492,008,613✔
1218
            break;
115,744,539✔
1219
          }
1220
          sourceIdx++;
376,264,074✔
1221
        }
1222
        if (colVal.cid == colId && COL_VAL_IS_VALUE(&colVal)) {
224,500,594✔
1223
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
114,153,352✔
1224
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, blockStart+ numOfRows, (const char*)colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
367,107✔
1225
            // ST_TASK_ILOG("%s vtable colId:%d, i:%d, colData:%p, data:%s, len:%d, rowIndex:%d, offset:%d, uid:%" PRId64, __func__, colId, i, pColData, 
1226
            //   (const char*)colVal.value.pData, colVal.value.nData, blockStart+ numOfRows, pColData->varmeta.offset[blockStart+ numOfRows], submitTbData.uid);
1227
          } else {
1228
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, blockStart + numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
113,786,245✔
1229
          }
1230
        } else {
1231
          colDataSetNULL(pColData, blockStart + numOfRows);
110,347,242✔
1232
        }
1233
      }
1234
      
1235
      numOfRows++;
53,814,249✔
1236
    }
1237
  }
1238

1239
  if (numOfRows > 0) {
35,719,255✔
1240
    if (!sStreamReaderInfo->isVtableStream) {
35,719,255✔
1241
      SStorageAPI  api = {0};
24,043,267✔
1242
      initStorageAPI(&api);
24,044,702✔
1243
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo, rsp->isCalc, &api, submitTbData.uid, pBlock, blockStart, numOfRows, 1));
24,043,262✔
1244
    }
1245
    
1246
    SColumnInfoData* pColData = taosArrayGetLast(pBlock->pDataBlock);
35,710,543✔
1247
    STREAM_CHECK_NULL_GOTO(pColData, terrno);
35,714,873✔
1248
    STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, blockStart, (const char*)&ver, numOfRows, 1, false));
35,714,873✔
1249
  }
1250

1251
  ST_TASK_DLOG("%s process submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
35,714,892✔
1252
    ", uid:%" PRId64 ", ver:%d, row index:%d, rows:%d", __func__, window.skey, window.ekey, 
1253
    id, submitTbData.uid, submitTbData.sver, pSlice->currentRowIdx, numOfRows);
1254
  pSlice->currentRowIdx += numOfRows;
35,720,676✔
1255
  pBlock->info.rows += numOfRows;
35,714,840✔
1256
  
1257
  if (gidHash == NULL) goto end;
35,720,670✔
1258

1259
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
12,576,737✔
1260
  if (data != NULL) {
12,571,665✔
1261
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
×
1262
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
×
1263
  } else {
1264
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
12,571,665✔
1265
  }
1266

1267
end:
45,441,572✔
1268
  if (code != 0) {                                                             \
45,451,953✔
1269
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); \
×
1270
  }
1271
  tEndDecode(pCoder);
45,451,953✔
1272
  return code;
45,460,773✔
1273
}
1274
static int32_t scanSubmitData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo,
45,443,214✔
1275
  void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp, int64_t ver) {
1276
  int32_t  code = 0;
45,443,214✔
1277
  int32_t  lino = 0;
45,443,214✔
1278
  STSchema* schemas = NULL;
45,443,214✔
1279
  SDecoder decoder = {0};
45,443,214✔
1280
  SSHashObj* gidHash = NULL;
45,463,026✔
1281
  void* pTask = sStreamReaderInfo->pTask;
45,463,026✔
1282

1283
  tDecoderInit(&decoder, data, len);
45,462,371✔
1284
  if (tStartDecode(&decoder) < 0) {
45,446,401✔
1285
    code = TSDB_CODE_INVALID_MSG;
×
1286
    TSDB_CHECK_CODE(code, lino, end);
×
1287
  }
1288

1289
  uint64_t nSubmitTbData = 0;
45,452,117✔
1290
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
45,452,242✔
1291
    code = TSDB_CODE_INVALID_MSG;
×
1292
    TSDB_CHECK_CODE(code, lino, end);
×
1293
  }
1294

1295
  if (rsp->metaBlock != NULL){
45,452,242✔
1296
    gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
22,322,677✔
1297
    STREAM_CHECK_NULL_GOTO(gidHash, terrno);
22,305,987✔
1298
  }
1299

1300
  for (int32_t i = 0; i < nSubmitTbData; i++) {
90,879,895✔
1301
    STREAM_CHECK_RET_GOTO(scanSubmitTbData(pVnode, &decoder, sStreamReaderInfo, &schemas, ranges, gidHash, rsp, ver));
45,428,682✔
1302
  }
1303

1304
  tEndDecode(&decoder);
45,451,213✔
1305

1306
  if (rsp->metaBlock != NULL){
45,449,009✔
1307
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
22,314,644✔
1308
    int32_t iter = 0;
22,287,027✔
1309
    void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
22,289,228✔
1310
    while (px != NULL) {
34,864,424✔
1311
      WalMetaResult* pMeta = (WalMetaResult*)px;
12,564,294✔
1312
      STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
12,564,294✔
1313
      ((SSDataBlock*)rsp->metaBlock)->info.rows++;
12,559,202✔
1314
      ST_TASK_DLOG("%s process meta data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
12,563,607✔
1315
            ", ver:%"PRId64, __func__, pMeta->skey, pMeta->ekey, pMeta->id, ver);
1316
      px = tSimpleHashIterate(gidHash, px, &iter);
12,565,830✔
1317
    }
1318
  }
1319
  
1320

1321
end:
45,419,257✔
1322
  taosMemoryFree(schemas);
45,449,900✔
1323
  tSimpleHashCleanup(gidHash);
45,433,942✔
1324
  tDecoderClear(&decoder);
45,455,755✔
1325
  return code;
45,455,809✔
1326
}
1327

1328
static int32_t scanSubmitTbDataPre(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* ranges, 
55,348,097✔
1329
  uint64_t* gid, int64_t* uid, int32_t* numOfRows, bool isCalc) {
1330
  int32_t code = 0;
55,348,097✔
1331
  int32_t lino = 0;
55,348,097✔
1332
  void* pTask = sStreamReaderInfo->pTask;
55,348,097✔
1333

1334
  if (tStartDecode(pCoder) < 0) {
55,363,473✔
1335
    code = TSDB_CODE_INVALID_MSG;
×
1336
    TSDB_CHECK_CODE(code, lino, end);
×
1337
  }
1338

1339
  SSubmitTbData submitTbData = {0};
55,381,243✔
1340
  uint8_t       version = 0;
55,383,445✔
1341
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
55,366,659✔
1342
    code = TSDB_CODE_INVALID_MSG;
×
1343
    TSDB_CHECK_CODE(code, lino, end);
×
1344
  }
1345
  version = (submitTbData.flags >> 8) & 0xff;
55,366,659✔
1346
  submitTbData.flags = submitTbData.flags & 0xff;
55,366,659✔
1347

1348
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1349
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
55,366,659✔
1350
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
4,530,182✔
1351
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
4,518,509✔
1352
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
4,518,509✔
1353
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq));
4,527,334✔
1354
  }
1355

1356
  // submit data
1357
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
55,318,865✔
1358
    code = TSDB_CODE_INVALID_MSG;
×
1359
    TSDB_CHECK_CODE(code, lino, end);
×
1360
  }
1361
  if (tDecodeI64(pCoder, uid) < 0) {
55,345,556✔
1362
    code = TSDB_CODE_INVALID_MSG;
×
1363
    TSDB_CHECK_CODE(code, lino, end);
×
1364
  }
1365

1366
  STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, submitTbData.suid, *uid, gid, isCalc), TDB_CODE_SUCCESS);
55,345,556✔
1367

1368
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
35,722,996✔
1369

1370
  if (ranges != NULL){
35,722,996✔
1371
    void* timerange = tSimpleHashGet(ranges, gid, sizeof(*gid));
23,142,523✔
1372
    if (timerange == NULL) goto end;;
23,147,568✔
1373
    int64_t* pRange = (int64_t*)timerange;
23,147,568✔
1374
    window.skey = pRange[0];
23,147,568✔
1375
    window.ekey = pRange[1];
23,147,568✔
1376
  }
1377
  
1378
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
35,725,880✔
1379
    code = TSDB_CODE_INVALID_MSG;
×
1380
    TSDB_CHECK_CODE(code, lino, end);
×
1381
  }
1382

1383
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
35,725,880✔
1384
    uint64_t nColData = 0;
×
1385
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1386
      code = TSDB_CODE_INVALID_MSG;
×
1387
      TSDB_CHECK_CODE(code, lino, end);
×
1388
    }
1389

1390
    SColData colData = {0};
×
1391
    code = tDecodeColData(version, pCoder, &colData, false);
×
1392
    if (code) {
×
1393
      code = TSDB_CODE_INVALID_MSG;
×
1394
      TSDB_CHECK_CODE(code, lino, end);
×
1395
    }
1396

1397
    if (colData.flag != HAS_VALUE) {
×
1398
      code = TSDB_CODE_INVALID_MSG;
×
1399
      TSDB_CHECK_CODE(code, lino, end);
×
1400
    }
1401
    int32_t rowStart = 0;
×
1402
    int32_t rowEnd = 0;
×
1403
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) {
×
1404
      STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, numOfRows));
×
1405
    } else {
1406
      (*numOfRows) = colData.nVal;
×
1407
    } 
1408
  } else {
1409
    uint64_t nRow = 0;
35,725,880✔
1410
    if (tDecodeU64v(pCoder, &nRow) < 0) {
35,718,581✔
1411
      code = TSDB_CODE_INVALID_MSG;
×
1412
      TSDB_CHECK_CODE(code, lino, end);
×
1413
    }
1414

1415
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) { 
35,718,581✔
1416
      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
66,200,178✔
1417
        SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
43,058,446✔
1418
        pCoder->pos += pRow->len;
43,055,529✔
1419
        if (pRow->ts < window.skey || pRow->ts > window.ekey) {
43,059,858✔
1420
          continue;
2,265,119✔
1421
        }
1422
        (*numOfRows)++;
40,794,756✔
1423
      }
1424
    } else {
1425
      (*numOfRows) = nRow;
12,579,705✔
1426
    }
1427
  }
1428
  
1429
end:
55,389,485✔
1430
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
55,361,835✔
1431
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
55,349,325✔
1432
  tEndDecode(pCoder);
55,347,121✔
1433
  return code;
55,334,629✔
1434
}
1435

1436
static int32_t scanSubmitDataPre(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
55,372,484✔
1437
  int32_t  code = 0;
55,372,484✔
1438
  int32_t  lino = 0;
55,372,484✔
1439
  SDecoder decoder = {0};
55,372,484✔
1440
  void* pTask = sStreamReaderInfo->pTask;
55,381,833✔
1441

1442
  tDecoderInit(&decoder, data, len);
55,388,447✔
1443
  if (tStartDecode(&decoder) < 0) {
55,290,014✔
1444
    code = TSDB_CODE_INVALID_MSG;
×
1445
    TSDB_CHECK_CODE(code, lino, end);
×
1446
  }
1447

1448
  uint64_t nSubmitTbData = 0;
55,343,871✔
1449
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
55,354,279✔
1450
    code = TSDB_CODE_INVALID_MSG;
×
1451
    TSDB_CHECK_CODE(code, lino, end);
×
1452
  }
1453

1454
  for (int32_t i = 0; i < nSubmitTbData; i++) {
110,704,058✔
1455
    uint64_t gid = -1;
55,350,068✔
1456
    int64_t  uid = 0;
55,348,946✔
1457
    int32_t numOfRows = 0;
55,360,403✔
1458
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataPre(&decoder, sStreamReaderInfo, ranges, &gid, &uid, &numOfRows, rsp->isCalc));
55,354,562✔
1459
    if (numOfRows <= 0) {
55,347,047✔
1460
      continue;
19,656,859✔
1461
    }
1462
    rsp->totalRows += numOfRows;
35,690,188✔
1463

1464
    SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(sStreamReaderInfo->indexHash, &uid, LONG_BYTES);
35,706,314✔
1465
    if (pSlice != NULL) {
35,705,394✔
1466
      pSlice->numRows += numOfRows;
34,125,424✔
1467
      ST_TASK_DLOG("%s again uid:%" PRId64 ", gid:%" PRIu64 ", total numOfRows:%d", __func__, uid, gid, pSlice->numRows);
34,127,625✔
1468
      pSlice->gId = gid;
34,133,171✔
1469
    } else {
1470
      SStreamWalDataSlice tmp = {.gId=gid,.numRows=numOfRows,.currentRowIdx=0,.startRowIdx=0};
1,579,970✔
1471
      ST_TASK_DLOG("%s first uid:%" PRId64 ", gid:%" PRIu64 ", numOfRows:%d", __func__, uid, gid, tmp.numRows);
1,582,140✔
1472
      STREAM_CHECK_RET_GOTO(tSimpleHashPut(sStreamReaderInfo->indexHash, &uid, LONG_BYTES, &tmp, sizeof(tmp)));
1,587,679✔
1473
    } 
1474
  }
1475

1476
  tEndDecode(&decoder);
55,353,990✔
1477

1478
end:
55,375,338✔
1479
  tDecoderClear(&decoder);
55,381,940✔
1480
  return code;
55,382,620✔
1481
}
1482

1483
static void resetIndexHash(SSHashObj* indexHash){
49,278,149✔
1484
  void*   pe = NULL;
49,278,149✔
1485
  int32_t iter = 0;
49,278,149✔
1486
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
114,902,891✔
1487
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
65,680,796✔
1488
    pInfo->startRowIdx = 0;
65,680,796✔
1489
    pInfo->currentRowIdx = 0;
65,680,904✔
1490
    pInfo->numRows = 0;
65,582,153✔
1491
    pInfo->gId = -1;
65,592,353✔
1492
  }
1493
}
49,302,829✔
1494

1495
static void buildIndexHash(SSHashObj* indexHash, void* pTask){
3,755,376✔
1496
  void*   pe = NULL;
3,755,376✔
1497
  int32_t iter = 0;
3,755,376✔
1498
  int32_t index = 0;
3,755,376✔
1499
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
13,422,574✔
1500
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
9,667,198✔
1501
    pInfo->startRowIdx = index;
9,667,198✔
1502
    pInfo->currentRowIdx = index;
9,667,198✔
1503
    index += pInfo->numRows;
9,669,399✔
1504
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
12,460,182✔
1505
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1506
  }
1507
}
3,757,577✔
1508

1509
static void printIndexHash(SSHashObj* indexHash, void* pTask){
3,748,091✔
1510
  void*   pe = NULL;
3,748,091✔
1511
  int32_t iter = 0;
3,748,091✔
1512
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
13,406,564✔
1513
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
9,658,473✔
1514
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
12,451,457✔
1515
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1516
  }
1517
}
3,748,860✔
1518

1519
static void filterIndexHash(SSHashObj* indexHash, SColumnInfoData* pRet){
94,266✔
1520
  void*   pe = NULL;
94,266✔
1521
  int32_t iter = 0;
94,266✔
1522
  int32_t index = 0;
94,266✔
1523
  int32_t pIndex = 0;
94,266✔
1524
  int8_t* pIndicator = (int8_t*)pRet->pData;
94,266✔
1525
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
290,185✔
1526
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
195,919✔
1527
    pInfo->startRowIdx = index;
195,919✔
1528
    int32_t size = pInfo->numRows;
195,919✔
1529
    for (int32_t i = 0; i < pInfo->numRows; i++) {
1,241,291✔
1530
      if (pIndicator && !pIndicator[pIndex++]) {
1,045,372✔
1531
        size--;
384,446✔
1532
      }
1533
    }
1534
    pInfo->numRows = size;
195,919✔
1535
    index += pInfo->numRows;
195,919✔
1536
    stTrace("stream reader re build index hash uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
195,919✔
1537
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1538
  }
1539
}
94,266✔
1540

1541
static int32_t prepareIndexMetaData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* resultRsp){
29,027,873✔
1542
  int32_t      code = 0;
29,027,873✔
1543
  int32_t      lino = 0;
29,027,873✔
1544
  void* pTask = sStreamReaderInfo->pTask;
29,027,873✔
1545

1546
  code = walReaderSeekVer(pWalReader, resultRsp->ver);
29,043,828✔
1547
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
29,049,070✔
1548
    if (resultRsp->ver < walGetFirstVer(pWalReader->pWal)) {
25,334,993✔
1549
      resultRsp->ver = walGetFirstVer(pWalReader->pWal);
×
1550
    }
1551
    ST_TASK_DLOG("%s scan wal error:%s",  __func__, tstrerror(code));
25,330,682✔
1552
    code = TSDB_CODE_SUCCESS;
25,326,280✔
1553
    goto end;
25,326,280✔
1554
  }
1555
  STREAM_CHECK_RET_GOTO(code);
3,714,077✔
1556

1557
  while (1) {
33,598,100✔
1558
    code = walNextValidMsg(pWalReader, true);
37,312,177✔
1559
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
37,297,462✔
1560
      ST_TASK_DLOG("%s scan wal error:%s", __func__, tstrerror(code));
3,702,544✔
1561
      code = TSDB_CODE_SUCCESS;
3,714,399✔
1562
      goto end;
3,714,399✔
1563
    }
1564
    STREAM_CHECK_RET_GOTO(code);
33,594,918✔
1565
    resultRsp->ver = pWalReader->curVersion;
33,594,918✔
1566
    SWalCont* wCont = &pWalReader->pHead->head;
33,600,779✔
1567
    resultRsp->verTime = wCont->ingestTs;
33,592,032✔
1568
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
33,593,386✔
1569
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
33,628,883✔
1570
    int64_t ver = wCont->version;
33,629,381✔
1571
    ST_TASK_DLOG("%s scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d", __func__,
33,615,357✔
1572
      ver, wCont->msgType, sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
1573
    if (wCont->msgType == TDMT_VND_SUBMIT) {
33,628,399✔
1574
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
32,247,555✔
1575
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
32,257,033✔
1576
      STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, data, len, NULL, resultRsp));
32,248,225✔
1577
    } else if (wCont->msgType == TDMT_VND_ALTER_TABLE && resultRsp->totalRows > 0) {
1,375,105✔
1578
      resultRsp->ver--;
14,320✔
1579
      break;
12,150✔
1580
    } else {
1581
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, resultRsp, ver));
1,360,785✔
1582
    }
1583

1584
    ST_TASK_DLOG("%s scan wal next ver:%" PRId64 ", totalRows:%d", __func__, resultRsp->ver, resultRsp->totalRows);
33,600,971✔
1585
    if (resultRsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
33,600,971✔
1586
      break;
×
1587
    }
1588
  }
1589
  
1590
end:
29,052,798✔
1591
  STREAM_PRINT_LOG_END(code, lino);
29,052,798✔
1592
  return code;
29,046,195✔
1593
}
1594

1595
static int32_t prepareIndexData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, 
20,227,252✔
1596
  SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp){
1597
  int32_t      code = 0;
20,227,252✔
1598
  int32_t      lino = 0;
20,227,252✔
1599

1600
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
43,372,618✔
1601
    int64_t *ver = taosArrayGet(versions, i);
23,144,700✔
1602
    if (ver == NULL) continue;
23,146,134✔
1603

1604
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
23,146,134✔
1605
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
23,147,568✔
1606
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
1607
      continue;
×
1608
    }
1609
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
23,145,366✔
1610

1611
    SWalCont* wCont = &pWalReader->pHead->head;
23,146,134✔
1612
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
23,146,134✔
1613
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
23,146,134✔
1614

1615
    STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, pBody, bodyLen, ranges, rsp));
23,142,513✔
1616
  }
1617
  
1618
end:
20,228,686✔
1619
  return code;
20,228,686✔
1620
}
1621

1622
static int32_t filterData(SSTriggerWalNewRsp* resultRsp, SStreamTriggerReaderInfo* sStreamReaderInfo) {
3,757,577✔
1623
  int32_t      code = 0;
3,757,577✔
1624
  int32_t       lino = 0;
3,757,577✔
1625
  SColumnInfoData* pRet = NULL;
3,757,577✔
1626

1627
  int64_t totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
3,757,577✔
1628
  STREAM_CHECK_RET_GOTO(qStreamFilter(((SSDataBlock*)resultRsp->dataBlock), sStreamReaderInfo->pFilterInfo, &pRet));
3,757,577✔
1629

1630
  if (((SSDataBlock*)resultRsp->dataBlock)->info.rows < totalRows) {
3,757,577✔
1631
    filterIndexHash(sStreamReaderInfo->indexHash, pRet);
94,266✔
1632
  }
1633

1634
end:
3,757,577✔
1635
  colDataDestroy(pRet);
3,757,577✔
1636
  taosMemoryFree(pRet);
3,757,577✔
1637
  return code;
3,757,577✔
1638
}
1639

1640
static int32_t processWalVerMetaDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
29,070,576✔
1641
                                    SSTriggerWalNewRsp* resultRsp) {
1642
  int32_t      code = 0;
29,070,576✔
1643
  int32_t      lino = 0;
29,070,576✔
1644
  void* pTask = sStreamReaderInfo->pTask;
29,070,576✔
1645
                                        
1646
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
29,080,815✔
1647
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
29,057,475✔
1648
  resetIndexHash(sStreamReaderInfo->indexHash);
29,057,475✔
1649
  blockDataEmpty(resultRsp->dataBlock);
29,080,645✔
1650
  blockDataEmpty(resultRsp->metaBlock);
29,027,510✔
1651
  int64_t lastVer = resultRsp->ver;                                      
29,007,996✔
1652
  STREAM_CHECK_RET_GOTO(prepareIndexMetaData(pWalReader, sStreamReaderInfo, resultRsp));
29,011,631✔
1653
  STREAM_CHECK_CONDITION_GOTO(resultRsp->totalRows == 0, TDB_CODE_SUCCESS);
29,052,794✔
1654

1655
  buildIndexHash(sStreamReaderInfo->indexHash, pTask);
1,130,150✔
1656
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(((SSDataBlock*)resultRsp->dataBlock), resultRsp->totalRows));
1,130,150✔
1657
  while(lastVer < resultRsp->ver) {
24,417,734✔
1658
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, lastVer++));
23,298,558✔
1659
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
23,268,495✔
1660
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
986,766✔
1661
      continue;
986,766✔
1662
    }
1663
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
22,292,734✔
1664
    SWalCont* wCont = &pWalReader->pHead->head;
22,307,394✔
1665
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
22,320,600✔
1666
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
22,316,966✔
1667

1668
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, NULL, resultRsp, wCont->version));
22,316,966✔
1669
  }
1670

1671
  int32_t metaRows = resultRsp->totalRows - ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
1,130,150✔
1672
  STREAM_CHECK_RET_GOTO(filterData(resultRsp, sStreamReaderInfo));
1,130,150✔
1673
  resultRsp->totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows + metaRows;
1,130,150✔
1674

1675
end:
29,056,531✔
1676
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
29,056,531✔
1677
          resultRsp->totalRows, resultRsp->ver, walGetAppliedVer(pWalReader->pWal));
1678
  walCloseReader(pWalReader);
29,054,361✔
1679
  return code;
29,041,117✔
1680
}
1681

1682
static int32_t processWalVerDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
20,220,602✔
1683
                                    SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
1684
  int32_t      code = 0;
20,220,602✔
1685
  int32_t      lino = 0;
20,220,602✔
1686

1687
  void* pTask = sStreamReaderInfo->pTask;
20,220,602✔
1688
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
20,227,159✔
1689
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
20,219,274✔
1690
  
1691
  if (taosArrayGetSize(versions) > 0) {
20,219,274✔
1692
    rsp->ver = *(int64_t*)taosArrayGetLast(versions);
2,627,427✔
1693
  }
1694
  
1695
  resetIndexHash(sStreamReaderInfo->indexHash);
20,219,274✔
1696
  STREAM_CHECK_RET_GOTO(prepareIndexData(pWalReader, sStreamReaderInfo, versions, ranges, rsp));
20,225,818✔
1697
  STREAM_CHECK_CONDITION_GOTO(rsp->totalRows == 0, TDB_CODE_SUCCESS);
20,224,298✔
1698

1699
  buildIndexHash(sStreamReaderInfo->indexHash, pTask);
2,627,427✔
1700

1701
  blockDataEmpty(rsp->dataBlock);
2,627,427✔
1702
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dataBlock, rsp->totalRows));
2,627,427✔
1703

1704
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
25,772,127✔
1705
    int64_t *ver = taosArrayGet(versions, i);
23,143,940✔
1706
    if (ver == NULL) continue;
23,146,134✔
1707

1708
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
23,146,134✔
1709
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
23,140,412✔
1710
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
1711
      continue;
×
1712
    }
1713
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
23,143,273✔
1714
    SWalCont* wCont = &pWalReader->pHead->head;
23,143,295✔
1715
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
23,143,295✔
1716
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
23,146,141✔
1717

1718
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, ranges, rsp, wCont->version));
23,146,141✔
1719
  }
1720
  // printDataBlock(rsp->dataBlock, __func__, "processWalVerDataNew");
1721
  STREAM_CHECK_RET_GOTO(filterData(rsp, sStreamReaderInfo));
2,628,187✔
1722
  rsp->totalRows = ((SSDataBlock*)rsp->dataBlock)->info.rows;
2,627,427✔
1723

1724
end:
20,228,686✔
1725
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
20,228,686✔
1726
            rsp->totalRows, rsp->ver, walGetAppliedVer(pWalReader->pWal));
1727
  walCloseReader(pWalReader);
20,228,686✔
1728
  return code;
20,224,965✔
1729
}
1730

1731
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
586,972✔
1732
  int32_t code = 0;
586,972✔
1733
  int32_t lino = 0;
586,972✔
1734
  SMetaReader metaReader = {0};
586,972✔
1735
  SStorageAPI api = {0};
586,972✔
1736
  initStorageAPI(&api);
586,972✔
1737
  *schemas = taosArrayInit(8, sizeof(SSchema));
586,972✔
1738
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
586,972✔
1739
  
1740
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
586,972✔
1741
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
586,972✔
1742

1743
  SSchemaWrapper* sSchemaWrapper = NULL;
586,972✔
1744
  if (metaReader.me.type == TD_CHILD_TABLE) {
586,972✔
1745
    int64_t suid = metaReader.me.ctbEntry.suid;
586,972✔
1746
    tDecoderClear(&metaReader.coder);
586,972✔
1747
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
586,972✔
1748
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
586,972✔
1749
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
×
1750
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
×
1751
  } else {
1752
    qError("invalid table type:%d", metaReader.me.type);
×
1753
  }
1754

1755
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
2,922,366✔
1756
    SSchema* s = sSchemaWrapper->pSchema + j;
2,335,394✔
1757
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
4,670,788✔
1758
  }
1759

1760
end:
586,972✔
1761
  api.metaReaderFn.clearReader(&metaReader);
586,972✔
1762
  STREAM_PRINT_LOG_END(code, lino);
586,972✔
1763
  if (code != 0)  {
586,972✔
1764
    taosArrayDestroy(*schemas);
×
1765
    *schemas = NULL;
×
1766
  }
1767
  return code;
586,972✔
1768
}
1769

1770
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
586,972✔
1771
  int32_t code = 0;
586,972✔
1772
  int32_t lino = 0;
586,972✔
1773
  size_t  schemaLen = taosArrayGetSize(schemas);
586,972✔
1774
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
586,972✔
1775
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1,817,084✔
1776
    col_id_t* id = taosArrayGet(cols, i);
1,230,112✔
1777
    STREAM_CHECK_NULL_GOTO(id, terrno);
1,230,112✔
1778
    for (size_t i = 0; i < schemaLen; i++) {
2,489,704✔
1779
      SSchema* s = taosArrayGet(schemas, i);
2,489,704✔
1780
      STREAM_CHECK_NULL_GOTO(s, terrno);
2,489,704✔
1781
      if (*id == s->colId) {
2,489,704✔
1782
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
1,230,112✔
1783
        break;
1,230,112✔
1784
      }
1785
    }
1786
  }
1787
  taosArrayPopFrontBatch(schemas, schemaLen);
586,972✔
1788

1789
end:
586,972✔
1790
  return code;
586,972✔
1791
}
1792

1793
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
×
1794
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
1795
  int32_t      code = 0;
×
1796
  int32_t      lino = 0;
×
1797
  SArray*      schemas = NULL;
×
1798

1799
  SSDataBlock* pBlock2 = NULL;
×
1800

1801
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
×
1802
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
×
1803
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
×
1804

1805
  pBlock2->info.id.uid = uid;
×
1806

1807
  // STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock2, ver, uid, window));
1808
  //printDataBlock(pBlock2, __func__, "");
1809

1810
  *pBlock = pBlock2;
×
1811
  pBlock2 = NULL;
×
1812

1813
end:
×
1814
  STREAM_PRINT_LOG_END(code, lino);
×
1815
  blockDataDestroy(pBlock2);
×
1816
  taosArrayDestroy(schemas);
×
1817
  return code;
×
1818
}
1819

1820
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
×
1821
                                    STargetNode* pTargetNodeTs) {
1822
  int32_t code = 0;
×
1823
  int32_t lino = 0;
×
1824

1825
  SColumnNode*         pCol = NULL;
×
1826
  SColumnNode*         pCol1 = NULL;
×
1827
  SValueNode*          pVal = NULL;
×
1828
  SValueNode*          pVal1 = NULL;
×
1829
  SOperatorNode*       op = NULL;
×
1830
  SOperatorNode*       op1 = NULL;
×
1831
  SLogicConditionNode* cond = NULL;
×
1832

1833
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
×
1834
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
×
1835
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
×
1836
  pCol->node.resType.bytes = LONG_BYTES;
×
1837
  pCol->slotId = pTargetNodeTs->slotId;
×
1838
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
×
1839

1840
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
×
1841

1842
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
×
1843
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
×
1844
  pVal->node.resType.bytes = LONG_BYTES;
×
1845
  pVal->datum.i = start;
×
1846
  pVal->typeData = start;
×
1847

1848
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
×
1849
  pVal1->datum.i = end;
×
1850
  pVal1->typeData = end;
×
1851

1852
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
×
1853
  op->opType = OP_TYPE_GREATER_EQUAL;
×
1854
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1855
  op->node.resType.bytes = CHAR_BYTES;
×
1856
  op->pLeft = (SNode*)pCol;
×
1857
  op->pRight = (SNode*)pVal;
×
1858
  pCol = NULL;
×
1859
  pVal = NULL;
×
1860

1861
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
×
1862
  op1->opType = OP_TYPE_LOWER_EQUAL;
×
1863
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1864
  op1->node.resType.bytes = CHAR_BYTES;
×
1865
  op1->pLeft = (SNode*)pCol1;
×
1866
  op1->pRight = (SNode*)pVal1;
×
1867
  pCol1 = NULL;
×
1868
  pVal1 = NULL;
×
1869

1870
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
×
1871
  cond->condType = LOGIC_COND_TYPE_AND;
×
1872
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1873
  cond->node.resType.bytes = CHAR_BYTES;
×
1874
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
×
1875
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
×
1876
  op = NULL;
×
1877
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
×
1878
  op1 = NULL;
×
1879

1880
  *pCond = cond;
×
1881

1882
end:
×
1883
  if (code != 0) {
×
1884
    nodesDestroyNode((SNode*)pCol);
×
1885
    nodesDestroyNode((SNode*)pCol1);
×
1886
    nodesDestroyNode((SNode*)pVal);
×
1887
    nodesDestroyNode((SNode*)pVal1);
×
1888
    nodesDestroyNode((SNode*)op);
×
1889
    nodesDestroyNode((SNode*)op1);
×
1890
    nodesDestroyNode((SNode*)cond);
×
1891
  }
1892
  STREAM_PRINT_LOG_END(code, lino);
×
1893

1894
  return code;
×
1895
}
1896

1897
/*
1898
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
1899
  int32_t              code = 0;
1900
  int32_t              lino = 0;
1901
  SLogicConditionNode* pAndCondition = NULL;
1902
  SLogicConditionNode* cond = NULL;
1903

1904
  if (pTargetNodeTs == NULL) {
1905
    vError("stream reader %s no ts column", __func__);
1906
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
1907
  }
1908
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
1909
  cond->condType = LOGIC_COND_TYPE_OR;
1910
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
1911
  cond->node.resType.bytes = CHAR_BYTES;
1912
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
1913

1914
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
1915
    data->curIdx = i;
1916

1917
    SReadHandle handle = {0};
1918
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
1919
    if (!handle.winRangeValid) {
1920
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
1921
              handle.winRange.ekey);
1922
      continue;
1923
    }
1924
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
1925
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
1926
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
1927
    pAndCondition = NULL;
1928
  }
1929

1930
  *pCond = cond;
1931

1932
end:
1933
  if (code != 0) {
1934
    nodesDestroyNode((SNode*)pAndCondition);
1935
    nodesDestroyNode((SNode*)cond);
1936
  }
1937
  STREAM_PRINT_LOG_END(code, lino);
1938

1939
  return code;
1940
}
1941
*/
1942

1943
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
1,226,128✔
1944
                                    STimeRangeNode* node, SReadHandle* handle, bool isExtWin) {
1945
  int32_t code = 0;
1,226,128✔
1946
  int32_t lino = 0;
1,226,128✔
1947
  void* pTask = sStreamReaderCalcInfo->pTask;
1,226,128✔
1948
  STimeWindow* pWin = isExtWin ? &handle->extWinRange : &handle->winRange;
1,226,128✔
1949
  bool* pValid = isExtWin ? &handle->extWinRangeValid : &handle->winRangeValid;
1,226,128✔
1950
  
1951
  if (req->pStRtFuncInfo->withExternalWindow) {
1,226,128✔
1952
    sStreamReaderCalcInfo->tmpRtFuncInfo.curIdx = 0;
800,282✔
1953
    sStreamReaderCalcInfo->tmpRtFuncInfo.triggerType = req->pStRtFuncInfo->triggerType;
800,282✔
1954
    
1955
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
800,282✔
1956
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
800,282✔
1957
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
800,282✔
1958
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
800,282✔
1959

1960
    if (!node->needCalc) {
800,282✔
1961
      pWin->skey = pFirst->wstart;
612,418✔
1962
      pWin->ekey = pLast->wend;
612,418✔
1963
      *pValid = true;
612,418✔
1964
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
612,418✔
1965
        pWin->ekey--;
444,985✔
1966
      }
1967
    } else {
1968
      SSTriggerCalcParam* pTmp = taosArrayGet(sStreamReaderCalcInfo->tmpRtFuncInfo.pStreamPesudoFuncVals, 0);
187,864✔
1969
      memcpy(pTmp, pFirst, sizeof(*pTmp));
187,864✔
1970

1971
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 1));
187,864✔
1972
      if (*pValid) {
187,864✔
1973
        int64_t skey = pWin->skey;
187,864✔
1974

1975
        memcpy(pTmp, pLast, sizeof(*pTmp));
187,864✔
1976
        STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 2));
187,864✔
1977

1978
        if (*pValid) {
187,864✔
1979
          pWin->skey = skey;
187,864✔
1980
        }
1981
      }
1982
      pWin->ekey--;
187,864✔
1983
    }
1984
  } else {
1985
    if (!node->needCalc) {
425,846✔
1986
      SSTriggerCalcParam* pCurr = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, req->pStRtFuncInfo->curIdx);
425,846✔
1987
      pWin->skey = pCurr->wstart;
425,846✔
1988
      pWin->ekey = pCurr->wend;
425,846✔
1989
      *pValid = true;
425,846✔
1990
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
425,846✔
1991
        pWin->ekey--;
378,626✔
1992
      }
1993
    } else {
1994
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, req->pStRtFuncInfo, pWin, pValid, 3));
×
1995
      pWin->ekey--;
×
1996
    }
1997
  }
1998

1999
  ST_TASK_DLOG("%s type:%s, withExternalWindow:%d, skey:%" PRId64 ", ekey:%" PRId64 ", validRange:%d", 
1,226,128✔
2000
      __func__, isExtWin ? "interp range" : "scan time range", req->pStRtFuncInfo->withExternalWindow, pWin->skey, pWin->ekey, *pValid);
2001

2002
end:
116,282✔
2003

2004
  if (code) {
1,226,128✔
2005
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2006
  }
2007
  
2008
  return code;
1,226,128✔
2009
}
2010

2011
static int32_t processTs(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
3,123,835✔
2012
                                  SStreamReaderTaskInner* pTaskInner) {
2013
  int32_t code = 0;
3,123,835✔
2014
  int32_t lino = 0;
3,123,835✔
2015

2016
  void* pTask = sStreamReaderInfo->pTask;
3,123,835✔
2017
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTaskInner->pTableList), sizeof(STsInfo));
3,123,835✔
2018
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
3,115,797✔
2019
  while (true) {
2,179,919✔
2020
    bool hasNext = false;
5,293,508✔
2021
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
5,301,546✔
2022
    if (hasNext) {
5,290,715✔
2023
      pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
2,215,769✔
2024
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
2,210,070✔
2025
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
2,208,530✔
2026
      if (pTaskInner->options.order == TSDB_ORDER_ASC) {
2,208,530✔
2027
        tsInfo->ts = pTaskInner->pResBlock->info.window.skey;
1,402,401✔
2028
      } else {
2029
        tsInfo->ts = pTaskInner->pResBlock->info.window.ekey;
811,216✔
2030
      }
2031
      tsInfo->gId = (sStreamReaderInfo->groupByTbname || sStreamReaderInfo->tableType != TSDB_SUPER_TABLE) ? 
4,920,020✔
2032
                    pTaskInner->pResBlock->info.id.uid : qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
2,696,340✔
2033
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
2,217,979✔
2034
              tsInfo->gId, tsRsp->ver);
2035
    }
2036
    
2037
    pTaskInner->currentGroupIndex++;
5,296,558✔
2038
    if (pTaskInner->currentGroupIndex >= qStreamGetTableListGroupNum(pTaskInner->pTableList) || pTaskInner->options.gid != 0) {
5,290,695✔
2039
      break;
2040
    }
2041
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTaskInner));
2,174,105✔
2042
  }
2043

2044
end:
3,118,022✔
2045
  STREAM_PRINT_LOG_END_WITHID(code, lino);
3,118,022✔
2046
  return code;
3,123,835✔
2047
}
2048

2049
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
363,975✔
2050
  int32_t code = 0;
363,975✔
2051
  int32_t lino = 0;
363,975✔
2052
  void*   buf = NULL;
363,975✔
2053
  size_t  size = 0;
363,975✔
2054
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
363,975✔
2055
  void* pTask = sStreamReaderInfo->pTask;
363,975✔
2056

2057
  ST_TASK_DLOG("vgId:%d %s start, trigger hash size:%d, calc hash size:%d", TD_VID(pVnode), __func__,
363,975✔
2058
                tSimpleHashGetSize(req->setTableReq.uidInfoTrigger), tSimpleHashGetSize(req->setTableReq.uidInfoCalc));
2059

2060
  TSWAP(sStreamReaderInfo->uidHashTrigger, req->setTableReq.uidInfoTrigger);
363,975✔
2061
  TSWAP(sStreamReaderInfo->uidHashCalc, req->setTableReq.uidInfoCalc);
363,975✔
2062
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashTrigger, TSDB_CODE_INVALID_PARA);
363,975✔
2063
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashCalc, TSDB_CODE_INVALID_PARA);
363,975✔
2064

2065
  sStreamReaderInfo->isVtableStream = true;
363,975✔
2066
  sStreamReaderInfo->groupByTbname = true;
363,975✔
2067
end:
363,975✔
2068
  STREAM_PRINT_LOG_END_WITHID(code, lino);
363,975✔
2069
  SRpcMsg rsp = {
363,975✔
2070
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2071
  tmsgSendRsp(&rsp);
363,975✔
2072
  return code;
363,975✔
2073
}
2074

2075
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,735,982✔
2076
  int32_t                 code = 0;
1,735,982✔
2077
  int32_t                 lino = 0;
1,735,982✔
2078
  SStreamReaderTaskInner* pTaskInner = NULL;
1,735,982✔
2079
  SStreamTsResponse       lastTsRsp = {0};
1,751,169✔
2080
  void*                   buf = NULL;
1,751,169✔
2081
  size_t                  size = 0;
1,751,169✔
2082

2083
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
1,752,603✔
2084
  void* pTask = sStreamReaderInfo->pTask;
1,752,603✔
2085

2086
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
1,752,603✔
2087

2088
  BUILD_OPTION(options, sStreamReaderInfo, -1, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, sStreamReaderInfo->tsSchemas, true,
1,752,603✔
2089
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, true, sStreamReaderInfo->uidHashTrigger);
2090
  SStorageAPI api = {0};
1,752,603✔
2091
  initStorageAPI(&api);
1,752,603✔
2092
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
1,752,603✔
2093

2094
  lastTsRsp.ver = pVnode->state.applied + 1;
1,752,603✔
2095

2096
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
1,752,603✔
2097
  ST_TASK_DLOG("vgId:%d %s get result, ver:%" PRId64, TD_VID(pVnode), __func__, lastTsRsp.ver);
1,752,603✔
2098
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
1,752,603✔
2099
  if (stDebugFlag & DEBUG_DEBUG) {
1,744,556✔
2100
    int32_t nInfo = taosArrayGetSize(lastTsRsp.tsInfo);
1,323,705✔
2101
    for (int32_t i = 0; i < nInfo; i++) {
1,993,113✔
2102
      STsInfo* tsInfo = TARRAY_GET_ELEM(lastTsRsp.tsInfo, i);
671,616✔
2103
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64, TD_VID(pVnode), __func__, tsInfo->ts, tsInfo->gId);
671,616✔
2104
    }
2105
  }
2106

2107
end:
1,742,348✔
2108
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,748,987✔
2109
  SRpcMsg rsp = {
1,751,824✔
2110
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2111
  tmsgSendRsp(&rsp);
1,751,169✔
2112
  taosArrayDestroy(lastTsRsp.tsInfo);
1,752,603✔
2113
  releaseStreamTask(&pTaskInner);
1,752,603✔
2114
  return code;
1,748,997✔
2115
}
2116

2117
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,392,790✔
2118
  int32_t                 code = 0;
1,392,790✔
2119
  int32_t                 lino = 0;
1,392,790✔
2120
  SStreamReaderTaskInner* pTaskInner = NULL;
1,392,790✔
2121
  SStreamTsResponse       firstTsRsp = {0};
1,392,790✔
2122
  void*                   buf = NULL;
1,392,790✔
2123
  size_t                  size = 0;
1,390,589✔
2124

2125
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
1,392,790✔
2126
  void* pTask = sStreamReaderInfo->pTask;
1,392,790✔
2127
  ST_TASK_DLOG("vgId:%d %s start, startTime:%"PRId64" ver:%"PRId64" gid:%"PRId64, TD_VID(pVnode), __func__, req->firstTsReq.startTime, req->firstTsReq.ver, req->firstTsReq.gid);
1,392,790✔
2128
  BUILD_OPTION(options, sStreamReaderInfo, req->firstTsReq.ver, TSDB_ORDER_ASC, req->firstTsReq.startTime, INT64_MAX, sStreamReaderInfo->tsSchemas, true,
1,392,790✔
2129
               STREAM_SCAN_GROUP_ONE_BY_ONE, req->firstTsReq.gid, true, sStreamReaderInfo->uidHashTrigger);
2130
  SStorageAPI api = {0};
1,392,790✔
2131
  initStorageAPI(&api);
1,392,790✔
2132
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
1,392,790✔
2133
  
2134
  firstTsRsp.ver = pVnode->state.applied;
1,371,232✔
2135
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
1,369,030✔
2136

2137
  ST_TASK_DLOG("vgId:%d %s get result size:%"PRIzu", ver:%"PRId64, TD_VID(pVnode), __func__, taosArrayGetSize(firstTsRsp.tsInfo), firstTsRsp.ver);
1,367,598✔
2138
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
1,367,598✔
2139
  if (stDebugFlag & DEBUG_DEBUG) {
1,368,366✔
2140
    int32_t nInfo = taosArrayGetSize(firstTsRsp.tsInfo);
904,532✔
2141
    for (int32_t i = 0; i < nInfo; i++) {
1,712,229✔
2142
      STsInfo* tsInfo = TARRAY_GET_ELEM(firstTsRsp.tsInfo, i);
807,697✔
2143
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64, TD_VID(pVnode), __func__, tsInfo->ts, tsInfo->gId);
807,697✔
2144
    }
2145
  }
2146

2147
end:
1,389,924✔
2148
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,391,357✔
2149
  SRpcMsg rsp = {
1,391,357✔
2150
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2151
  tmsgSendRsp(&rsp);
1,385,521✔
2152
  taosArrayDestroy(firstTsRsp.tsInfo);
1,392,790✔
2153
  releaseStreamTask(&pTaskInner);
1,390,620✔
2154
  return code;
1,390,620✔
2155
}
2156

2157
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
2,751,805✔
2158
  int32_t code = 0;
2,751,805✔
2159
  int32_t lino = 0;
2,751,805✔
2160
  void*   buf = NULL;
2,751,805✔
2161
  size_t  size = 0;
2,753,973✔
2162

2163
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
2,753,973✔
2164
  void* pTask = sStreamReaderInfo->pTask;
2,753,973✔
2165
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
2,753,973✔
2166

2167
  SStreamReaderTaskInner* pTaskInner = NULL;
2,751,790✔
2168
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
2,753,973✔
2169

2170
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
2,751,792✔
2171
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbMetaReq.ver, req->tsdbMetaReq.order, req->tsdbMetaReq.startTime, req->tsdbMetaReq.endTime, sStreamReaderInfo->tsSchemas, true, 
2,751,792✔
2172
      (req->tsdbMetaReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), req->tsdbMetaReq.gid, true, sStreamReaderInfo->uidHashTrigger);
2173
    SStorageAPI api = {0};
2,753,973✔
2174
    initStorageAPI(&api);
2,753,973✔
2175
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
2,753,973✔
2176
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
2,732,415✔
2177
    
2178
    STREAM_CHECK_RET_GOTO(createBlockForTsdbMeta(&pTaskInner->pResBlockDst, sStreamReaderInfo->isVtableStream));
2,732,415✔
2179
  } else {
2180
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2181
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2182
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2183
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2184
  }
2185

2186
  blockDataCleanup(pTaskInner->pResBlockDst);
2,732,415✔
2187
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
2,732,415✔
2188
  bool hasNext = true;
2,732,415✔
2189
  while (true) {
804,863✔
2190
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
3,537,278✔
2191
    if (!hasNext) {
3,537,278✔
2192
      break;
2,732,415✔
2193
    }
2194
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
804,863✔
2195
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
804,863✔
2196

2197
    int32_t index = 0;
804,863✔
2198
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
804,863✔
2199
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
804,863✔
2200
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
804,863✔
2201
    if (!sStreamReaderInfo->isVtableStream) {
804,863✔
2202
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
250,849✔
2203
    }
2204
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
804,863✔
2205

2206
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
804,863✔
2207
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2208
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2209
            pTaskInner->pResBlockDst->info.rows++;
804,863✔
2210
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
804,863✔
2211
      break;
×
2212
    }
2213
  }
2214

2215
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
2,732,415✔
2216
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
2,732,415✔
2217
  printDataBlock(pTaskInner->pResBlockDst, __func__, "meta", ((SStreamTask *)sStreamReaderInfo->pTask)->streamId);
2,732,415✔
2218
  if (!hasNext) {
2,732,415✔
2219
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
2,732,415✔
2220
  }
2221

2222
end:
2,753,973✔
2223
  STREAM_PRINT_LOG_END_WITHID(code, lino);
2,751,127✔
2224
  SRpcMsg rsp = {
2,751,127✔
2225
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2226
  tmsgSendRsp(&rsp);
2,753,973✔
2227
  return code;
2,753,973✔
2228
}
2229

2230
static int32_t vnodeProcessStreamTsdbTsDataReqNonVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
133,141✔
2231
  int32_t                 code = 0;
133,141✔
2232
  int32_t                 lino = 0;
133,141✔
2233
  SStreamReaderTaskInner* pTaskInner = NULL;
133,141✔
2234
  void*                   buf = NULL;
133,141✔
2235
  size_t                  size = 0;
133,141✔
2236
  SSDataBlock*            pBlockRes = NULL;
133,141✔
2237

2238
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
133,141✔
2239
  void* pTask = sStreamReaderInfo->pTask;
133,141✔
2240
  ST_TASK_DLOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
133,141✔
2241
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2242
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2243

2244
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
133,141✔
2245
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
2246
  options.uid = req->tsdbTsDataReq.uid;
133,141✔
2247
  SStorageAPI api = {0};
133,141✔
2248
  initStorageAPI(&api);
133,141✔
2249
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
133,141✔
2250
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
133,141✔
2251
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
133,141✔
2252

2253
  while (1) {
133,141✔
2254
    bool hasNext = false;
266,282✔
2255
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
266,282✔
2256
    if (!hasNext) {
266,282✔
2257
      break;
133,141✔
2258
    }
2259
    if (!sStreamReaderInfo->isVtableStream){
133,141✔
2260
      pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
133,141✔
2261
    }
2262

2263
    SSDataBlock* pBlock = NULL;
133,141✔
2264
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
133,141✔
2265
    if (pBlock != NULL && pBlock->info.rows > 0) {
133,141✔
2266
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo, false, &api, pBlock->info.id.uid, pBlock,
133,141✔
2267
          0, pBlock->info.rows, 1));
2268
    }
2269
    
2270
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
133,141✔
2271
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
133,141✔
2272
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
133,141✔
2273
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2274
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2275
  }
2276

2277
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
133,141✔
2278

2279
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
133,141✔
2280
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
133,141✔
2281

2282
end:
133,141✔
2283
  STREAM_PRINT_LOG_END_WITHID(code, lino);
133,141✔
2284
  SRpcMsg rsp = {
133,141✔
2285
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2286
  tmsgSendRsp(&rsp);
133,141✔
2287
  blockDataDestroy(pBlockRes);
133,141✔
2288

2289
  releaseStreamTask(&pTaskInner);
133,141✔
2290
  return code;
133,141✔
2291
}
2292

2293
static int32_t vnodeProcessStreamTsdbTsDataReqVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
×
2294
  int32_t                 code = 0;
×
2295
  int32_t                 lino = 0;
×
2296
  SStreamReaderTaskInner* pTaskInner = NULL;
×
2297
  void*                   buf = NULL;
×
2298
  size_t                  size = 0;
×
2299
  SSDataBlock*            pBlockRes = NULL;
×
2300

2301
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
×
2302
  void* pTask = sStreamReaderInfo->pTask;
×
2303
  ST_TASK_ELOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
×
2304
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2305
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2306

2307
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
×
2308
               sStreamReaderInfo->tsSchemas, true, STREAM_SCAN_ALL, 0, true, NULL);
2309
  options.suid = req->tsdbTsDataReq.suid;
×
2310
  options.uid = req->tsdbTsDataReq.uid;
×
2311
  SStorageAPI api = {0};
×
2312
  initStorageAPI(&api);
×
2313
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->tsBlock, &api));
×
2314
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
×
2315

2316
  while (1) {
×
2317
    bool hasNext = false;
×
2318
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
×
2319
    if (!hasNext) {
×
2320
      break;
×
2321
    }
2322

2323
    SSDataBlock* pBlock = NULL;
×
2324
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
×
2325
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRes, pBlock));
×
2326
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
×
2327
            TD_VID(pVnode), __func__, pBlockRes->info.window.skey, pBlockRes->info.window.ekey,
2328
            pBlockRes->info.id.uid, pBlockRes->info.id.groupId, pBlockRes->info.rows);
2329
  }
2330

2331
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
×
2332
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
×
2333

2334
end:
×
2335
  STREAM_PRINT_LOG_END_WITHID(code, lino);
×
2336
  SRpcMsg rsp = {
×
2337
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2338
  tmsgSendRsp(&rsp);
×
2339
  blockDataDestroy(pBlockRes);
×
2340

2341
  releaseStreamTask(&pTaskInner);
×
2342
  return code;
×
2343
}
2344

2345
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,074,567✔
2346
  int32_t code = 0;
1,074,567✔
2347
  int32_t lino = 0;
1,074,567✔
2348
  void*   buf = NULL;
1,074,567✔
2349
  size_t  size = 0;
1,074,567✔
2350

2351
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
1,074,567✔
2352
  SStreamReaderTaskInner* pTaskInner = NULL;
1,074,567✔
2353
  void* pTask = sStreamReaderInfo->pTask;
1,074,567✔
2354
  ST_TASK_DLOG("vgId:%d %s start. ver:%"PRId64",order:%d,startTs:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, req->tsdbTriggerDataReq.gid);
1,074,567✔
2355
  
2356
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
1,074,567✔
2357

2358
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
1,074,567✔
2359
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
517,278✔
2360
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
2361
                 req->tsdbTriggerDataReq.gid, true, NULL);
2362
    SStorageAPI api = {0};
517,278✔
2363
    initStorageAPI(&api);
517,278✔
2364
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
517,278✔
2365

2366
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
517,278✔
2367
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
517,278✔
2368
  } else {
2369
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
557,289✔
2370
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
557,289✔
2371
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
557,289✔
2372
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
557,289✔
2373
  }
2374

2375
  blockDataCleanup(pTaskInner->pResBlockDst);
1,074,567✔
2376
  bool hasNext = true;
1,074,567✔
2377
  while (1) {
×
2378
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
1,074,567✔
2379
    if (!hasNext) {
1,074,567✔
2380
      break;
517,278✔
2381
    }
2382
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
557,289✔
2383
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
557,289✔
2384

2385
    SSDataBlock* pBlock = NULL;
557,289✔
2386
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
557,289✔
2387
    if (pBlock != NULL && pBlock->info.rows > 0) {
557,289✔
2388
      STREAM_CHECK_RET_GOTO(
557,289✔
2389
        processTag(pVnode, sStreamReaderInfo, false, &pTaskInner->api, pBlock->info.id.uid, pBlock, 0, pBlock->info.rows, 1));
2390
    }
2391
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
557,289✔
2392
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
557,289✔
2393
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
557,289✔
2394
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2395
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2396
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
557,289✔
2397
      break;
557,289✔
2398
    }
2399
  }
2400

2401
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
1,073,133✔
2402
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
1,074,567✔
2403
  if (!hasNext) {
1,074,567✔
2404
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
517,278✔
2405
  }
2406

2407
end:
1,076,001✔
2408
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,073,134✔
2409
  SRpcMsg rsp = {
1,073,134✔
2410
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2411
  tmsgSendRsp(&rsp);
1,071,700✔
2412

2413
  return code;
1,074,567✔
2414
}
2415

2416
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
101,915,294✔
2417
  int32_t code = 0;
101,915,294✔
2418
  int32_t lino = 0;
101,915,294✔
2419
  void*   buf = NULL;
101,915,294✔
2420
  size_t  size = 0;
101,915,294✔
2421
  SSDataBlock*            pBlockRes = NULL;
101,915,294✔
2422

2423
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
101,913,104✔
2424
  void* pTask = sStreamReaderInfo->pTask;
101,913,104✔
2425
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64",ver:%"PRId64, TD_VID(pVnode), __func__, 
101,913,104✔
2426
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid, req->tsdbCalcDataReq.ver);
2427

2428
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
101,918,130✔
2429

2430
  SStreamReaderTaskInner* pTaskInner = NULL;
101,919,548✔
2431
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
101,919,548✔
2432

2433
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
101,917,358✔
2434
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
101,917,358✔
2435
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
2436
    SStorageAPI api = {0};
101,913,750✔
2437
    initStorageAPI(&api);
101,917,358✔
2438
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
101,916,712✔
2439

2440
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
101,640,907✔
2441
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
101,637,299✔
2442
  } else {
2443
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2444
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2445
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2446
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2447
  }
2448

2449
  blockDataCleanup(pTaskInner->pResBlockDst);
101,633,729✔
2450
  bool hasNext = true;
101,640,922✔
2451
  while (1) {
8,888,164✔
2452
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
110,526,250✔
2453
    if (!hasNext) {
110,486,639✔
2454
      break;
101,624,342✔
2455
    }
2456
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
8,862,297✔
2457

2458
    SSDataBlock* pBlock = NULL;
8,905,363✔
2459
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
8,905,363✔
2460
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
8,877,603✔
2461
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
8,871,733✔
2462
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
8,859,499✔
2463
      break;
×
2464
    }
2465
  }
2466

2467
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
101,626,532✔
2468
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
101,629,368✔
2469
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
101,619,339✔
2470
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
101,615,005✔
2471
  printDataBlock(pBlockRes, __func__, "tsdb_calc_data", ((SStreamTask*)pTask)->streamId);
101,613,651✔
2472
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
101,644,530✔
2473
  if (!hasNext) {
101,635,124✔
2474
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
101,635,124✔
2475
  }
2476

2477
end:
101,861,186✔
2478
  STREAM_PRINT_LOG_END_WITHID(code, lino);
101,890,707✔
2479
  SRpcMsg rsp = {
101,920,053✔
2480
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2481
  tmsgSendRsp(&rsp);
101,919,548✔
2482
  blockDataDestroy(pBlockRes);
101,910,914✔
2483
  return code;
101,912,978✔
2484
}
2485

2486
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
586,972✔
2487
  int32_t code = 0;
586,972✔
2488
  int32_t lino = 0;
586,972✔
2489
  void*   buf = NULL;
586,972✔
2490
  size_t  size = 0;
586,972✔
2491
  int32_t* slotIdList = NULL;
586,972✔
2492
  SArray* sortedCid = NULL;
586,972✔
2493
  SArray* schemas = NULL;
586,972✔
2494
  
2495
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
586,972✔
2496
  void* pTask = sStreamReaderInfo->pTask;
586,972✔
2497
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",ver:%"PRId64, TD_VID(pVnode), __func__, 
586,972✔
2498
    req->tsdbDataReq.skey, req->tsdbDataReq.ekey, req->tsdbDataReq.uid, req->tsdbDataReq.ver);
2499
    
2500
  SStreamReaderTaskInner* pTaskInner = NULL;
586,972✔
2501
  int64_t key = req->tsdbDataReq.uid;
586,972✔
2502

2503
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
586,972✔
2504
    // sort cid and build slotIdList
2505
    slotIdList = taosMemoryMalloc(taosArrayGetSize(req->tsdbDataReq.cids) * sizeof(int32_t));
586,972✔
2506
    STREAM_CHECK_NULL_GOTO(slotIdList, terrno);
586,972✔
2507
    sortedCid = taosArrayDup(req->tsdbDataReq.cids, NULL);
586,972✔
2508
    STREAM_CHECK_NULL_GOTO(sortedCid, terrno);
586,972✔
2509
    taosArraySort(sortedCid, sortCid);
586,972✔
2510
    for (int32_t i = 0; i < taosArrayGetSize(req->tsdbDataReq.cids); i++) {
1,817,084✔
2511
      int16_t* cid = taosArrayGet(req->tsdbDataReq.cids, i);
1,230,112✔
2512
      STREAM_CHECK_NULL_GOTO(cid, terrno);
1,230,112✔
2513
      for (int32_t j = 0; j < taosArrayGetSize(sortedCid); j++) {
1,954,480✔
2514
        int16_t* cidSorted = taosArrayGet(sortedCid, j);
1,954,480✔
2515
        STREAM_CHECK_NULL_GOTO(cidSorted, terrno);
1,954,480✔
2516
        if (*cid == *cidSorted) {
1,954,480✔
2517
          slotIdList[j] = i;
1,230,112✔
2518
          break;
1,230,112✔
2519
        }
2520
      }
2521
    }
2522

2523
    STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, req->tsdbDataReq.uid, &schemas));
586,972✔
2524
    STREAM_CHECK_RET_GOTO(shrinkScheams(req->tsdbDataReq.cids, schemas));
586,972✔
2525
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbDataReq.ver, req->tsdbDataReq.order, req->tsdbDataReq.skey,
586,972✔
2526
                    req->tsdbDataReq.ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
2527

2528
    options.suid = req->tsdbDataReq.suid;
586,972✔
2529
    options.uid = req->tsdbDataReq.uid;
586,972✔
2530

2531
    SStorageAPI api = {0};
586,972✔
2532
    initStorageAPI(&api);
586,972✔
2533
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
586,972✔
2534
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
586,972✔
2535

2536
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
586,972✔
2537
    cleanupQueryTableDataCond(&pTaskInner->cond);
586,972✔
2538
    taosArraySort(pTaskInner->options.schemas, sortSSchema);
586,972✔
2539

2540
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
586,972✔
2541
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
2542
                                                        pTaskInner->options.suid, pTaskInner->options.ver, &slotIdList));
2543
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
586,972✔
2544
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
2545
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
586,972✔
2546
  } else {
2547
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2548
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2549
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2550
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2551
  }
2552

2553
  blockDataCleanup(pTaskInner->pResBlockDst);
586,972✔
2554
  bool hasNext = true;
586,972✔
2555
  while (1) {
586,972✔
2556
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
1,173,944✔
2557
    if (!hasNext) {
1,173,944✔
2558
      break;
586,972✔
2559
    }
2560

2561
    SSDataBlock* pBlock = NULL;
586,972✔
2562
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
586,972✔
2563
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
586,972✔
2564
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
586,972✔
2565
      break;
×
2566
    }
2567
  }
2568
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
586,972✔
2569
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
586,972✔
2570
  printDataBlock(pTaskInner->pResBlockDst, __func__, "tsdb_data", ((SStreamTask*)pTask)->streamId);
586,972✔
2571
  if (!hasNext) {
586,972✔
2572
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
586,972✔
2573
  }
2574

2575
end:
586,972✔
2576
  STREAM_PRINT_LOG_END_WITHID(code, lino);
586,972✔
2577
  SRpcMsg rsp = {
586,972✔
2578
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2579
  tmsgSendRsp(&rsp);
586,972✔
2580
  taosMemFree(slotIdList);
586,972✔
2581
  taosArrayDestroy(sortedCid);
586,972✔
2582
  taosArrayDestroy(schemas);
586,972✔
2583
  return code;
586,972✔
2584
}
2585

2586
static int32_t vnodeProcessStreamWalMetaNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
48,088,980✔
2587
  int32_t      code = 0;
48,088,980✔
2588
  int32_t      lino = 0;
48,088,980✔
2589
  void*        buf = NULL;
48,088,980✔
2590
  size_t       size = 0;
48,088,980✔
2591
  int64_t      lastVer = 0;
48,088,980✔
2592
  SSTriggerWalNewRsp resultRsp = {0};
48,088,980✔
2593

2594
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
48,088,980✔
2595
  void* pTask = sStreamReaderInfo->pTask;
47,889,952✔
2596
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaNewReq.lastVer);
47,837,734✔
2597

2598
  if (sStreamReaderInfo->metaBlock == NULL) {
47,842,622✔
2599
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
1,125,654✔
2600
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
1,125,654✔
2601
  }
2602
  blockDataEmpty(sStreamReaderInfo->metaBlock);
47,898,730✔
2603
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
47,829,743✔
2604
  resultRsp.ver = req->walMetaNewReq.lastVer;
47,832,604✔
2605
  STREAM_CHECK_RET_GOTO(processWalVerMetaNew(pVnode, &resultRsp, sStreamReaderInfo, req->walMetaNewReq.ctime));
47,836,225✔
2606

2607
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
47,855,934✔
2608
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
47,876,906✔
2609
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, NULL);
849,110✔
2610
  buf = rpcMallocCont(size);
849,110✔
2611
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, NULL);
849,110✔
2612
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
849,110✔
2613

2614
end:
48,075,934✔
2615
  if (resultRsp.totalRows == 0) {
48,064,287✔
2616
    code = TSDB_CODE_STREAM_NO_DATA;
47,220,225✔
2617
    buf = rpcMallocCont(sizeof(int64_t));
47,220,225✔
2618
    *(int64_t *)buf = resultRsp.ver;
47,161,650✔
2619
    size = sizeof(int64_t);
47,169,652✔
2620
  }
2621
  SRpcMsg rsp = {
48,013,714✔
2622
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2623
  tmsgSendRsp(&rsp);
48,069,290✔
2624
  if (code == TSDB_CODE_STREAM_NO_DATA){
48,081,904✔
2625
    code = 0;
47,234,988✔
2626
  }
2627
  STREAM_PRINT_LOG_END_WITHID(code, lino);
48,081,904✔
2628
  blockDataDestroy(resultRsp.deleteBlock);
48,103,457✔
2629
  blockDataDestroy(resultRsp.dropBlock);
48,088,191✔
2630

2631
  return code;
48,079,550✔
2632
}
2633
static int32_t vnodeProcessStreamWalMetaDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
29,071,650✔
2634
  int32_t      code = 0;
29,071,650✔
2635
  int32_t      lino = 0;
29,071,650✔
2636
  void*        buf = NULL;
29,071,650✔
2637
  size_t       size = 0;
29,071,650✔
2638
  SSTriggerWalNewRsp resultRsp = {0};
29,071,650✔
2639
  
2640
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
29,079,676✔
2641
  void* pTask = sStreamReaderInfo->pTask;
29,079,676✔
2642
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaDataNewReq.lastVer);
29,065,047✔
2643

2644
  if (sStreamReaderInfo->metaBlock == NULL) {
29,067,237✔
2645
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
676,464✔
2646
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
675,300✔
2647
  }
2648
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
29,081,877✔
2649
  resultRsp.dataBlock = sStreamReaderInfo->triggerBlock;
29,087,317✔
2650
  resultRsp.ver = req->walMetaDataNewReq.lastVer;
29,069,911✔
2651
  STREAM_CHECK_RET_GOTO(processWalVerMetaDataNew(pVnode, sStreamReaderInfo, &resultRsp));
29,051,066✔
2652

2653
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
29,048,387✔
2654
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
1,122,866✔
2655
  buf = rpcMallocCont(size);
1,122,866✔
2656
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
1,122,866✔
2657
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
1,122,866✔
2658
  printDataBlock(sStreamReaderInfo->triggerBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
1,122,866✔
2659
  printDataBlock(resultRsp.dropBlock, __func__, "drop", ((SStreamTask*)pTask)->streamId);
1,122,866✔
2660
  printDataBlock(resultRsp.deleteBlock, __func__, "delete", ((SStreamTask*)pTask)->streamId);
1,122,866✔
2661
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
1,122,866✔
2662

2663
end:
29,048,387✔
2664
  if (resultRsp.totalRows == 0) {
29,052,789✔
2665
    buf = rpcMallocCont(sizeof(int64_t));
27,954,016✔
2666
    *(int64_t *)buf = resultRsp.ver;
27,893,673✔
2667
    size = sizeof(int64_t);
27,897,313✔
2668
    code = TSDB_CODE_STREAM_NO_DATA;
27,897,313✔
2669
  }
2670
  SRpcMsg rsp = {
28,996,086✔
2671
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2672
  tmsgSendRsp(&rsp);
29,039,122✔
2673
  if (code == TSDB_CODE_STREAM_NO_DATA){
29,073,950✔
2674
    code = 0;
27,960,548✔
2675
  }
2676
  blockDataDestroy(resultRsp.deleteBlock);
29,073,950✔
2677
  blockDataDestroy(resultRsp.dropBlock);
29,067,550✔
2678

2679
  STREAM_PRINT_LOG_END_WITHID(code, lino);
29,064,817✔
2680

2681
  return code;
29,072,599✔
2682
}
2683

2684
static int32_t vnodeProcessStreamWalDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
17,288,374✔
2685
  int32_t      code = 0;
17,288,374✔
2686
  int32_t      lino = 0;
17,288,374✔
2687
  void*        buf = NULL;
17,288,374✔
2688
  size_t       size = 0;
17,288,374✔
2689
  SSTriggerWalNewRsp resultRsp = {0};
17,288,374✔
2690

2691
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
17,290,568✔
2692
  void* pTask = sStreamReaderInfo->pTask;
17,290,568✔
2693
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
17,288,374✔
2694

2695
  resultRsp.dataBlock = sStreamReaderInfo->triggerBlock;
17,288,374✔
2696
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
17,286,180✔
2697
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
17,290,568✔
2698

2699
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
17,285,550✔
2700

2701
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
388,238✔
2702
  buf = rpcMallocCont(size);
388,238✔
2703
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
388,238✔
2704
  printDataBlock(sStreamReaderInfo->triggerBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
388,238✔
2705
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
388,238✔
2706

2707
end:
17,285,550✔
2708
  if (resultRsp.totalRows == 0) {
17,288,374✔
2709
    buf = rpcMallocCont(sizeof(int64_t));
16,904,524✔
2710
    *(int64_t *)buf = resultRsp.ver;
16,885,637✔
2711
    size = sizeof(int64_t);
16,885,637✔
2712
    code = TSDB_CODE_STREAM_NO_DATA;
16,885,637✔
2713
  }
2714
  SRpcMsg rsp = {
17,269,487✔
2715
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2716
  tmsgSendRsp(&rsp);
17,270,256✔
2717
  if (code == TSDB_CODE_STREAM_NO_DATA){
17,286,208✔
2718
    code = 0;
16,906,718✔
2719
  }
2720

2721
  blockDataDestroy(resultRsp.deleteBlock);
17,286,208✔
2722
  blockDataDestroy(resultRsp.dropBlock);
17,287,725✔
2723
  STREAM_PRINT_LOG_END_WITHID(code, lino);
17,285,507✔
2724

2725
  return code;
17,288,447✔
2726
}
2727

2728
static int32_t vnodeProcessStreamWalCalcDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
2,936,591✔
2729
  int32_t      code = 0;
2,936,591✔
2730
  int32_t      lino = 0;
2,936,591✔
2731
  void*        buf = NULL;
2,936,591✔
2732
  size_t       size = 0;
2,936,591✔
2733
  SSTriggerWalNewRsp resultRsp = {0};
2,936,591✔
2734
  SSDataBlock* pBlock1 = NULL;
2,936,591✔
2735
  SSDataBlock* pBlock2 = NULL;
2,936,591✔
2736
  
2737
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
2,936,591✔
2738
  void* pTask = sStreamReaderInfo->pTask;
2,936,591✔
2739
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
2,936,591✔
2740

2741
  resultRsp.dataBlock = sStreamReaderInfo->isVtableStream ? sStreamReaderInfo->calcBlock : sStreamReaderInfo->triggerBlock;
2,936,591✔
2742
  resultRsp.isCalc = sStreamReaderInfo->isVtableStream ? true : false;
2,936,591✔
2743
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
2,936,591✔
2744
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
2,936,591✔
2745

2746
  if (!sStreamReaderInfo->isVtableStream){
2,239,189✔
2747
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerBlock, true, &pBlock1));
1,793,173✔
2748
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcBlock, false, &pBlock2));
1,794,607✔
2749
  
2750
    blockDataTransform(pBlock2, pBlock1);
1,794,607✔
2751
    resultRsp.dataBlock = pBlock2;
1,794,607✔
2752
  }
2753

2754
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
2,240,623✔
2755
  buf = rpcMallocCont(size);
2,239,189✔
2756
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
2,239,189✔
2757
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
2,239,189✔
2758
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
2,236,987✔
2759

2760
end:
2,935,158✔
2761
  if (resultRsp.totalRows == 0) {
2,935,158✔
2762
    buf = rpcMallocCont(sizeof(int64_t));
697,402✔
2763
    *(int64_t *)buf = resultRsp.ver;
697,402✔
2764
    size = sizeof(int64_t);
697,402✔
2765
    code = TSDB_CODE_STREAM_NO_DATA;
697,402✔
2766
  }
2767
  SRpcMsg rsp = {
2,935,158✔
2768
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2769
  tmsgSendRsp(&rsp);
2,936,591✔
2770
  if (code == TSDB_CODE_STREAM_NO_DATA){
2,936,591✔
2771
    code = 0;
697,402✔
2772
  }
2773

2774
  blockDataDestroy(pBlock1);
2,936,591✔
2775
  blockDataDestroy(pBlock2);
2,934,389✔
2776
  blockDataDestroy(resultRsp.deleteBlock);
2,936,591✔
2777
  blockDataDestroy(resultRsp.dropBlock);
2,936,591✔
2778
  STREAM_PRINT_LOG_END_WITHID(code, lino);
2,936,591✔
2779

2780
  return code;
2,936,591✔
2781
}
2782

2783
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
2,545,717✔
2784
  int32_t code = 0;
2,545,717✔
2785
  int32_t lino = 0;
2,545,717✔
2786
  void*   buf = NULL;
2,545,717✔
2787
  size_t  size = 0;
2,545,717✔
2788

2789
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
2,545,717✔
2790
  void* pTask = sStreamReaderInfo->pTask;
2,545,717✔
2791
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
2,545,868✔
2792

2793
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
2,545,868✔
2794
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
2,545,868✔
2795
  SStreamGroupInfo pGroupInfo = {0};
2,545,868✔
2796
  pGroupInfo.gInfo = *gInfo;
2,545,868✔
2797

2798
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
2,545,868✔
2799
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
2800
  buf = rpcMallocCont(size);
2,540,830✔
2801
  STREAM_CHECK_NULL_GOTO(buf, terrno);
2,544,430✔
2802
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
2,544,430✔
2803
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
2804
end:
2,544,458✔
2805
  if (code != 0) {
2,544,458✔
2806
    rpcFreeCont(buf);
×
2807
    buf = NULL;
×
2808
    size = 0;
×
2809
  }
2810
  STREAM_PRINT_LOG_END_WITHID(code, lino);
2,544,458✔
2811
  SRpcMsg rsp = {
2,544,609✔
2812
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2813
  tmsgSendRsp(&rsp);
2,545,868✔
2814

2815
  return code;
2,545,868✔
2816
}
2817

2818
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,936,403✔
2819
  int32_t              code = 0;
1,936,403✔
2820
  int32_t              lino = 0;
1,936,403✔
2821
  void*                buf = NULL;
1,936,403✔
2822
  size_t               size = 0;
1,936,403✔
2823
  SStreamMsgVTableInfo vTableInfo = {0};
1,936,403✔
2824
  SMetaReader          metaReader = {0};
1,936,403✔
2825
  SStorageAPI api = {0};
1,936,403✔
2826
  initStorageAPI(&api);
1,936,403✔
2827

2828
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
1,936,403✔
2829
  void* pTask = sStreamReaderInfo->pTask;
1,936,403✔
2830
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
1,936,403✔
2831

2832
  SArray* cids = req->virTableInfoReq.cids;
1,936,403✔
2833
  STREAM_CHECK_NULL_GOTO(cids, terrno);
1,934,976✔
2834

2835
  SArray* pTableListArray = qStreamGetTableArrayList(sStreamReaderInfo->tableList);
1,934,976✔
2836
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
1,936,403✔
2837

2838
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
1,936,403✔
2839
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
1,936,403✔
2840
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
1,936,403✔
2841

2842
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
4,821,887✔
2843
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
2,885,484✔
2844
    if (pKeyInfo == NULL) {
2,884,072✔
2845
      continue;
×
2846
    }
2847
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
2,884,072✔
2848
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
2,882,660✔
2849
    vTable->uid = pKeyInfo->uid;
2,882,660✔
2850
    vTable->gId = pKeyInfo->groupId;
2,884,072✔
2851

2852
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
2,884,072✔
2853
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
2,885,484✔
2854
      vTable->cols.nCols = metaReader.me.colRef.nCols;
108,920✔
2855
      vTable->cols.version = metaReader.me.colRef.version;
108,920✔
2856
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
108,920✔
2857
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
653,520✔
2858
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
544,600✔
2859
      }
2860
    } else {
2861
      vTable->cols.nCols = taosArrayGetSize(cids);
2,776,564✔
2862
      vTable->cols.version = metaReader.me.colRef.version;
2,776,564✔
2863
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
2,776,564✔
2864
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
9,310,040✔
2865
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
33,460,118✔
2866
          if (metaReader.me.colRef.pColRef[j].hasRef &&
30,675,610✔
2867
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
23,811,288✔
2868
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
3,755,502✔
2869
            break;
3,748,968✔
2870
          }
2871
        }
2872
      }
2873
    }
2874
    tDecoderClear(&metaReader.coder);
2,885,484✔
2875
  }
2876
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
1,936,403✔
2877
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
1,936,403✔
2878

2879
end:
1,936,403✔
2880
  tDestroySStreamMsgVTableInfo(&vTableInfo);
1,936,403✔
2881
  api.metaReaderFn.clearReader(&metaReader);
1,936,403✔
2882
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,934,233✔
2883
  SRpcMsg rsp = {
1,934,233✔
2884
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2885
  tmsgSendRsp(&rsp);
1,936,403✔
2886
  return code;
1,936,403✔
2887
}
2888

2889
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
363,975✔
2890
  int32_t                   code = 0;
363,975✔
2891
  int32_t                   lino = 0;
363,975✔
2892
  void*                     buf = NULL;
363,975✔
2893
  size_t                    size = 0;
363,975✔
2894
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
363,975✔
2895
  SMetaReader               metaReader = {0};
363,975✔
2896
  int64_t streamId = req->base.streamId;
363,975✔
2897
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
363,975✔
2898

2899
  SStorageAPI api = {0};
363,975✔
2900
  initStorageAPI(&api);
363,975✔
2901

2902
  SArray* cols = req->origTableInfoReq.cols;
363,975✔
2903
  STREAM_CHECK_NULL_GOTO(cols, terrno);
363,975✔
2904

2905
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
363,975✔
2906

2907
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
363,975✔
2908

2909
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
363,975✔
2910
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
968,715✔
2911
    OTableInfo*    oInfo = taosArrayGet(cols, i);
606,174✔
2912
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
604,747✔
2913
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
604,747✔
2914
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
604,747✔
2915
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
604,747✔
2916
    vTableInfo->uid = metaReader.me.uid;
606,174✔
2917
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
606,174✔
2918

2919
    SSchemaWrapper* sSchemaWrapper = NULL;
604,747✔
2920
    if (metaReader.me.type == TD_CHILD_TABLE) {
604,747✔
2921
      int64_t suid = metaReader.me.ctbEntry.suid;
590,263✔
2922
      vTableInfo->suid = suid;
590,263✔
2923
      tDecoderClear(&metaReader.coder);
591,690✔
2924
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
591,690✔
2925
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
591,690✔
2926
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
14,484✔
2927
      vTableInfo->suid = 0;
14,484✔
2928
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
14,484✔
2929
    } else {
2930
      stError("invalid table type:%d", metaReader.me.type);
×
2931
    }
2932

2933
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
2,746,626✔
2934
      SSchema* s = sSchemaWrapper->pSchema + j;
2,746,626✔
2935
      if (strcmp(s->name, oInfo->refColName) == 0) {
2,745,858✔
2936
        vTableInfo->cid = s->colId;
604,740✔
2937
        break;
604,740✔
2938
      }
2939
    }
2940
    if (vTableInfo->cid == 0) {
603,330✔
2941
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
2942
              oInfo->refTableName);
2943
    }
2944
    tDecoderClear(&metaReader.coder);
606,174✔
2945
  }
2946

2947
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
363,975✔
2948

2949
end:
362,541✔
2950
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
363,975✔
2951
  api.metaReaderFn.clearReader(&metaReader);
362,541✔
2952
  STREAM_PRINT_LOG_END(code, lino);
362,541✔
2953
  SRpcMsg rsp = {
362,541✔
2954
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2955
  tmsgSendRsp(&rsp);
362,541✔
2956
  return code;
363,975✔
2957
}
2958

2959
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
1,131,421✔
2960
  int32_t                   code = 0;
1,131,421✔
2961
  int32_t                   lino = 0;
1,131,421✔
2962
  void*                     buf = NULL;
1,131,421✔
2963
  size_t                    size = 0;
1,131,421✔
2964
  SSDataBlock* pBlock = NULL;
1,131,421✔
2965

2966
  SMetaReader               metaReader = {0};
1,131,421✔
2967
  SMetaReader               metaReaderStable = {0};
1,131,421✔
2968
  int64_t streamId = req->base.streamId;
1,131,421✔
2969
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
1,131,421✔
2970

2971
  SStorageAPI api = {0};
1,131,421✔
2972
  initStorageAPI(&api);
1,131,421✔
2973

2974
  SArray* cols = req->virTablePseudoColReq.cids;
1,131,421✔
2975
  STREAM_CHECK_NULL_GOTO(cols, terrno);
1,131,421✔
2976

2977
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
1,131,421✔
2978
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
1,131,421✔
2979

2980
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
1,131,421✔
2981

2982
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
1,131,421✔
2983
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
1,131,421✔
2984
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 || *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
25,347✔
2985
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
25,347✔
2986
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
25,347✔
2987
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
25,347✔
2988
    pBlock->info.rows = 1;
25,347✔
2989
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
25,347✔
2990
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
25,347✔
2991
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
25,347✔
2992
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
1,106,074✔
2993
    int64_t suid = metaReader.me.ctbEntry.suid;
1,106,074✔
2994
    api.metaReaderFn.readerReleaseLock(&metaReader);
1,106,074✔
2995
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
1,106,074✔
2996

2997
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
1,106,074✔
2998
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
1,106,074✔
2999
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
2,973,322✔
3000
      col_id_t* id = taosArrayGet(cols, i);
1,867,248✔
3001
      STREAM_CHECK_NULL_GOTO(id, terrno);
1,867,248✔
3002
      if (*id == -1) {
1,867,248✔
3003
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
1,106,074✔
3004
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
1,106,074✔
3005
        continue;
1,106,074✔
3006
      }
3007
      size_t j = 0;
761,174✔
3008
      for (; j < sSchemaWrapper->nCols; j++) {
912,122✔
3009
        SSchema* s = sSchemaWrapper->pSchema + j;
912,122✔
3010
        if (s->colId == *id) {
912,122✔
3011
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
761,174✔
3012
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
761,174✔
3013
          break;
761,174✔
3014
        }
3015
      }
3016
      if (j == sSchemaWrapper->nCols) {
761,174✔
3017
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
3018
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
3019
      }
3020
    }
3021
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
1,106,074✔
3022
    pBlock->info.rows = 1;
1,106,074✔
3023
    
3024
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
2,973,322✔
3025
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,867,248✔
3026
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
1,867,248✔
3027

3028
      if (pDst->info.colId == -1) {
1,867,248✔
3029
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
1,106,074✔
3030
        continue;
1,106,074✔
3031
      }
3032
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
761,174✔
3033
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
3034
        continue;
×
3035
      }
3036

3037
      STagVal val = {0};
761,174✔
3038
      val.cid = pDst->info.colId;
761,174✔
3039
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
761,174✔
3040

3041
      char* data = NULL;
761,174✔
3042
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
761,174✔
3043
        data = tTagValToData((const STagVal*)p, false);
761,174✔
3044
      } else {
3045
        data = (char*)p;
×
3046
      }
3047

3048
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
761,174✔
3049
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
3050

3051
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
761,174✔
3052
          (data != NULL)) {
3053
        taosMemoryFree(data);
100,632✔
3054
      }
3055
    }
3056
  } else {
3057
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
3058
    code = TSDB_CODE_INVALID_PARA;
×
3059
    goto end;
×
3060
  }
3061
  
3062
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
1,131,421✔
3063
  printDataBlock(pBlock, __func__, "", streamId);
1,131,421✔
3064
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
1,131,421✔
3065

3066
end:
1,131,421✔
3067
  if(size == 0){
1,131,421✔
3068
    code = TSDB_CODE_STREAM_NO_DATA;
×
3069
  }
3070
  api.metaReaderFn.clearReader(&metaReaderStable);
1,131,421✔
3071
  api.metaReaderFn.clearReader(&metaReader);
1,131,421✔
3072
  STREAM_PRINT_LOG_END(code, lino);
1,131,421✔
3073
  SRpcMsg rsp = {
1,131,421✔
3074
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3075
  tmsgSendRsp(&rsp);
1,131,421✔
3076
  blockDataDestroy(pBlock);
1,131,421✔
3077
  return code;
1,131,421✔
3078
}
3079

3080
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
54,629,651✔
3081
  int32_t            code = 0;
54,629,651✔
3082
  int32_t            lino = 0;
54,629,651✔
3083
  void*              buf = NULL;
54,629,651✔
3084
  size_t             size = 0;
54,629,651✔
3085
  void*              taskAddr = NULL;
54,627,424✔
3086
  SArray*            pResList = NULL;
54,627,424✔
3087
  bool               hasNext = false;
54,627,424✔
3088

3089
  SResFetchReq req = {0};
54,627,424✔
3090
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
54,627,424✔
3091
                              TSDB_CODE_QRY_INVALID_INPUT);
3092
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
54,625,244✔
3093
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
54,629,651✔
3094

3095
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
54,495,635✔
3096
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
54,495,635✔
3097
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
54,495,635✔
3098
  void* pTask = sStreamReaderCalcInfo->pTask;
54,495,635✔
3099
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
54,495,635✔
3100
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
3101

3102
  if (req.reset) {
54,495,635✔
3103
    int64_t uid = 0;
54,114,334✔
3104
    if (req.dynTbname) {
54,114,334✔
3105
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
463,085✔
3106
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
463,085✔
3107
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
463,085✔
3108
        if (pValue != NULL && pValue->isTbname) {
463,085✔
3109
          uid = pValue->uid;
463,085✔
3110
          break;
463,085✔
3111
        }
3112
      }
3113
    }
3114
    
3115
    SReadHandle handle = {0};
54,114,334✔
3116
    handle.vnode = pVnode;
54,114,334✔
3117
    handle.uid = uid;
54,114,334✔
3118

3119
    initStorageAPI(&handle.api);
54,114,334✔
3120
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
54,109,226✔
3121
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
51,906,593✔
3122
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
52,448,497✔
3123
      if (node != NULL) {
52,447,790✔
3124
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, false));
1,226,128✔
3125
      } else {
3126
        ST_TASK_DLOG("vgId:%d %s no scan time range node", TD_VID(pVnode), __func__);
51,221,662✔
3127
      }
3128

3129
      node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pExtTimeRange;
52,451,407✔
3130
      if (node != NULL) {
52,449,953✔
3131
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, true));
×
3132
      } else {
3133
        ST_TASK_DLOG("vgId:%d %s no interp time range node", TD_VID(pVnode), __func__);
52,449,953✔
3134
      }      
3135
    }
3136

3137
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
54,115,788✔
3138
    sStreamReaderCalcInfo->rtInfo.funcInfo.hasPlaceHolder = sStreamReaderCalcInfo->hasPlaceHolder;
54,115,788✔
3139
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
54,115,788✔
3140

3141
    if (sStreamReaderCalcInfo->pTaskInfo == NULL || !qNeedReset(sStreamReaderCalcInfo->pTaskInfo)) {
54,115,788✔
3142
      qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
1,864,169✔
3143
      STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
1,864,169✔
3144
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
3145
                                                    req.taskId));
3146
    } else {
3147
      STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, &handle));
52,251,619✔
3148
    }
3149

3150
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
54,103,490✔
3151
  }
3152

3153
  if (req.pOpParam != NULL) {
54,485,499✔
3154
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
715,878✔
3155
  }
3156
  
3157
  pResList = taosArrayInit(4, POINTER_BYTES);
54,485,499✔
3158
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
54,478,229✔
3159
  uint64_t ts = 0;
54,478,229✔
3160
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
54,479,616✔
3161

3162
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
124,259,903✔
3163
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
69,809,263✔
3164
    if (pBlock == NULL) continue;
69,809,263✔
3165
    printDataBlock(pBlock, __func__, "fetch", ((SStreamTask*)pTask)->streamId);
69,809,263✔
3166
/*    
3167
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
3168
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
3169
      printDataBlock(pBlock, __func__, "fetch filter");
3170
    }
3171
*/    
3172
  }
3173

3174
end:
54,616,592✔
3175
  ST_TASK_DLOG("vgId:%d %s start to build rsp", TD_VID(pVnode), __func__);
54,629,651✔
3176
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
54,629,651✔
3177
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
54,629,651✔
3178

3179
  taosArrayDestroy(pResList);
54,629,651✔
3180
  streamReleaseTask(taskAddr);
54,629,651✔
3181

3182
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST){
54,629,651✔
3183
    code = TDB_CODE_SUCCESS;
×
3184
  }
3185
  STREAM_PRINT_LOG_END(code, lino);
54,629,651✔
3186
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
54,629,651✔
3187
  tmsgSendRsp(&rsp);
54,629,651✔
3188
  tDestroySResFetchReq(&req);
54,623,863✔
3189
  return code;
54,623,799✔
3190
}
3191

3192
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
268,274,141✔
3193
  int32_t                   code = 0;
268,274,141✔
3194
  int32_t                   lino = 0;
268,274,141✔
3195
  SSTriggerPullRequestUnion req = {0};
268,274,141✔
3196
  void*                     taskAddr = NULL;
268,272,818✔
3197

3198
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
268,234,663✔
3199
  if (!syncIsReadyForRead(pVnode->sync)) {
268,256,430✔
3200
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
297,468✔
3201
    return 0;
297,468✔
3202
  }
3203

3204
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
267,995,453✔
3205
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
54,629,651✔
3206
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
213,357,753✔
3207
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
213,370,934✔
3208
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
213,375,198✔
3209
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
213,375,231✔
3210
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
213,300,489✔
3211
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
3212
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
213,325,747✔
3213
    if (sStreamReaderInfo != NULL) {  
213,398,041✔
3214
      (void)taosThreadMutexLock(&sStreamReaderInfo->mutex);
212,777,738✔
3215
      if (sStreamReaderInfo->tableList == NULL) {
212,782,968✔
3216
        STREAM_CHECK_RET_GOTO(generateTablistForStreamReader(pVnode, sStreamReaderInfo, false));  
1,802,118✔
3217
        STREAM_CHECK_RET_GOTO(generateTablistForStreamReader(pVnode, sStreamReaderInfo, true));
1,800,685✔
3218
        STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamReaderInfo->pConditions, &sStreamReaderInfo->pFilterInfo, 0, NULL));
1,802,118✔
3219
      }
3220
      (void)taosThreadMutexUnlock(&sStreamReaderInfo->mutex);
212,787,361✔
3221
      sStreamReaderInfo->pVnode = pVnode;
212,792,622✔
3222
    }
3223
    switch (req.base.type) {
213,329,207✔
3224
      case STRIGGER_PULL_SET_TABLE:
363,975✔
3225
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo));
363,975✔
3226
        break;
363,975✔
3227
      case STRIGGER_PULL_LAST_TS:
1,752,603✔
3228
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
1,752,603✔
3229
        break;
1,748,997✔
3230
      case STRIGGER_PULL_FIRST_TS:
1,392,790✔
3231
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
1,392,790✔
3232
        break;
1,363,171✔
3233
      case STRIGGER_PULL_TSDB_META:
2,753,973✔
3234
      case STRIGGER_PULL_TSDB_META_NEXT:
3235
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo));
2,753,973✔
3236
        break;
2,732,415✔
3237
      case STRIGGER_PULL_TSDB_TS_DATA:
133,141✔
3238
        if (sStreamReaderInfo->isVtableStream) {
133,141✔
3239
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo));
×
3240
        } else {
3241
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo));
133,141✔
3242
        }
3243
        break;
133,141✔
3244
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
1,074,567✔
3245
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
3246
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
1,074,567✔
3247
        break;
1,074,567✔
3248
      case STRIGGER_PULL_TSDB_CALC_DATA:
101,917,358✔
3249
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
3250
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
101,917,358✔
3251
        break;
101,636,542✔
3252
      case STRIGGER_PULL_TSDB_DATA:
586,972✔
3253
      case STRIGGER_PULL_TSDB_DATA_NEXT:
3254
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
586,972✔
3255
        break;
586,972✔
3256
      case STRIGGER_PULL_GROUP_COL_VALUE:
2,545,868✔
3257
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo));
2,545,868✔
3258
        break;
2,545,717✔
3259
      case STRIGGER_PULL_VTABLE_INFO:
1,936,403✔
3260
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
1,936,403✔
3261
        break;
1,936,403✔
3262
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
1,131,421✔
3263
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req));
1,131,421✔
3264
        break;
1,131,421✔
3265
      case STRIGGER_PULL_OTABLE_INFO:
363,975✔
3266
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req));
363,975✔
3267
        break;
363,975✔
3268
      case STRIGGER_PULL_WAL_META_NEW:
48,085,361✔
3269
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
48,085,361✔
3270
        break;
48,087,540✔
3271
      case STRIGGER_PULL_WAL_DATA_NEW:
17,290,568✔
3272
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
17,290,568✔
3273
        break;
17,284,832✔
3274
      case STRIGGER_PULL_WAL_META_DATA_NEW:
29,063,641✔
3275
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
29,063,641✔
3276
        break;
29,071,834✔
3277
      case STRIGGER_PULL_WAL_CALC_DATA_NEW:
2,936,591✔
3278
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
2,936,591✔
3279
        break;
2,936,591✔
3280
      default:
×
3281
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
3282
        STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
3283
        break;
×
3284
    }
3285
  } else {
3286
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
5,829✔
3287
    STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
3288
  }
3289
end:
213,273,913✔
3290

3291
  streamReleaseTask(taskAddr);
213,311,433✔
3292

3293
  tDestroySTriggerPullRequest(&req);
213,344,754✔
3294
  STREAM_PRINT_LOG_END(code, lino);
213,257,667✔
3295
  return code;
213,290,949✔
3296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc