• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4997

20 Mar 2026 06:10AM UTC coverage: 71.739% (-0.3%) from 72.069%
#4997

push

travis-ci

web-flow
feat: add query phase tracking for SHOW QUERIES (#34706)

148 of 183 new or added lines in 10 files covered. (80.87%)

9273 existing lines in 172 files now uncovered.

244572 of 340921 relevant lines covered (71.74%)

133392941.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.5
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdbool.h>
17
#include <stdint.h>
18
#include <taos.h>
19
#include <tdef.h>
20
#include "executor.h"
21
#include "nodes.h"
22
#include "osMemPool.h"
23
#include "osMemory.h"
24
#include "scalar.h"
25
#include "stream.h"
26
#include "streamReader.h"
27
#include "taosdef.h"
28
#include "taoserror.h"
29
#include "tarray.h"
30
#include "tcommon.h"
31
#include "tdatablock.h"
32
#include "tdb.h"
33
#include "tdef.h"
34
#include "tencode.h"
35
#include "tglobal.h"
36
#include "thash.h"
37
#include "tlist.h"
38
#include "tlockfree.h"
39
#include "tmsg.h"
40
#include "tsimplehash.h"
41
#include "ttypes.h"
42
#include "vnd.h"
43
#include "vnode.h"
44
#include "vnodeInt.h"
45
#include "executor.h"
46

47
int32_t cacheTag(SVnode* pVnode, SHashObj* metaCache, SExprInfo* pExprInfo, int32_t numOfExpr, SStorageAPI* api, uint64_t uid, col_id_t colId, SRWLatch* lock);
48

49
#define BUILD_OPTION(options, _suid, _ver, _order, startTime, endTime, _schemas, _isSchema, _pSlotList)      \
50
  SStreamOptions                       options = {.suid = _suid,                                                   \
51
                                                  .ver = _ver,                                                     \
52
                                                  .order = _order,                                                 \
53
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
54
                                                  .schemas = _schemas,                                             \
55
                                                  .isSchema = _isSchema,                                           \
56
                                                  .pSlotList = _pSlotList};
57

58
typedef struct WalMetaResult {
59
  uint64_t    id;
60
  int64_t     skey;
61
  int64_t     ekey;
62
} WalMetaResult;
63

64
static int64_t getSuid(SStreamTriggerReaderInfo* sStreamReaderInfo, STableKeyInfo* pList) {
6,850,602✔
65
  int64_t suid = 0;
6,850,602✔
66
  if (!sStreamReaderInfo->isVtableStream) {
6,850,602✔
67
    suid = sStreamReaderInfo->suid;
6,384,183✔
68
    goto end;
6,384,183✔
69
  }
70

71
  if (pList == NULL) {
466,419✔
72
    goto end;
×
73
  }
74

75
  taosRLockLatch(&sStreamReaderInfo->lock);
466,419✔
76
  SStreamTableMapElement* element = taosHashGet(sStreamReaderInfo->vSetTableList.uIdMap, &pList->uid, LONG_BYTES);  
466,419✔
77
  if (element != 0) {
466,419✔
78
    suid = element->table->groupId;
262,279✔
79
    taosRUnLockLatch(&sStreamReaderInfo->lock);
262,279✔
80
    goto end;
262,279✔
81
  }
82
  taosRUnLockLatch(&sStreamReaderInfo->lock);
204,140✔
83

84
end:
6,850,602✔
85
  return suid;
6,850,602✔
86
}
87

88
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
6,842,288✔
89

90
int32_t sortCid(const void *lp, const void *rp) {
1,316,620✔
91
  int16_t* c1 = (int16_t*)lp;
1,316,620✔
92
  int16_t* c2 = (int16_t*)rp;
1,316,620✔
93

94
  if (*c1 < *c2) {
1,316,620✔
95
    return -1;
1,306,544✔
96
  } else if (*c1 > *c2) {
10,076✔
97
    return 1;
10,076✔
98
  }
99

100
  return 0;
×
101
}
102

103
int32_t sortSSchema(const void *lp, const void *rp) {
1,308,676✔
104
  SSchema* c1 = (SSchema*)lp;
1,308,676✔
105
  SSchema* c2 = (SSchema*)rp;
1,308,676✔
106

107
  if (c1->colId < c2->colId) {
1,308,676✔
108
    return -1;
1,298,600✔
109
  } else if (c1->colId > c2->colId) {
10,076✔
110
    return 1;
10,076✔
111
  }
112

113
  return 0;
×
114
}
115

116
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
36,232,361✔
117
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
36,232,361✔
118
  if (pSrc == NULL) {
36,238,541✔
119
    return terrno;
×
120
  }
121

122
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
36,238,541✔
123
  return 0;
36,232,374✔
124
}
125

126
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
8,655,940✔
127
  int32_t code = pTask->storageApi->tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
8,655,940✔
128
  if (code != TSDB_CODE_SUCCESS) {
8,654,317✔
129
    pTask->storageApi->tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
130
  }
131

132
  return code;
8,653,719✔
133
}
134

135
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
1,229,879✔
136
  return pTask->storageApi->tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes);
1,229,879✔
137
}
138

139
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
116,568✔
140
  int32_t code = 0;
116,568✔
141
  int32_t lino = 0;
116,568✔
142
  void*   buf = NULL;
116,568✔
143
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
116,568✔
144
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
116,568✔
145
  buf = rpcMallocCont(len);
116,568✔
146
  STREAM_CHECK_NULL_GOTO(buf, terrno);
116,568✔
147
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
116,568✔
148
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
116,568✔
149
  *data = buf;
116,568✔
150
  *size = len;
116,568✔
151
  buf = NULL;
116,568✔
152
end:
116,568✔
153
  rpcFreeCont(buf);
116,568✔
154
  return code;
116,568✔
155
}
156

157
static bool ignoreMetaChange(int64_t tableListVer, int64_t ver) {
141,750✔
158
  stDebug("%s tableListVer:%" PRId64 " ver:%" PRId64, __func__, tableListVer, ver);
141,750✔
159
  return tableListVer >= ver;
141,750✔
160
}
161

162
static bool needReLoadTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int8_t tableType, int64_t suid, int64_t uid, bool isCalc){
4,066,473✔
163
  if ((tableType == TD_CHILD_TABLE || tableType == TD_VIRTUAL_CHILD_TABLE) &&
4,066,473✔
164
      sStreamReaderInfo->tableType == TD_SUPER_TABLE && 
1,951,748✔
165
      suid == sStreamReaderInfo->suid) {
695,153✔
166
    taosRLockLatch(&sStreamReaderInfo->lock);
10,296✔
167
    uint64_t gid = qStreamGetGroupIdFromOrigin(sStreamReaderInfo, uid);
10,296✔
168
    taosRUnLockLatch(&sStreamReaderInfo->lock);
10,296✔
169
    if (gid == (uint64_t)-1) return true;
10,296✔
170
  }
171
  return false;
4,056,474✔
172
}
173

174
static bool uidInTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id){
11,584,808✔
175
  int32_t  ret = false;
11,584,808✔
176
  if (sStreamReaderInfo->tableType == TD_SUPER_TABLE) {
11,584,808✔
177
    if (suid != sStreamReaderInfo->suid) goto end;
6,649,243✔
178
    if (qStreamGetTableListNum(sStreamReaderInfo) == 0) goto end;
3,291,358✔
179
  } 
180
  *id = qStreamGetGroupIdFromOrigin(sStreamReaderInfo, uid);
8,223,594✔
181
  if (*id == -1) goto end;
8,222,997✔
182
  ret = true;
5,170,734✔
183

184
end:
11,585,499✔
185
  stTrace("%s ret:%d %p %p check suid:%" PRId64 " uid:%" PRId64 " gid:%"PRIu64, __func__, ret, sStreamReaderInfo, sStreamReaderInfo->tableList.gIdMap, suid, uid, *id);
11,585,499✔
186
  return ret;
11,582,972✔
187
}
188

189
static bool uidInTableListOrigin(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id) {
32,635✔
190
  return uidInTableList(sStreamReaderInfo, suid, uid, id);
32,635✔
191
}
192

193
static bool uidInTableListSet(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id, bool isCalc) {
49,844,046✔
194
  bool ret = false;
49,844,046✔
195
  taosRLockLatch(&sStreamReaderInfo->lock);
49,844,046✔
196
  if (sStreamReaderInfo->isVtableStream) {
49,836,403✔
197
    int64_t tmp[2] = {suid, uid};
38,296,304✔
198
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, tmp, sizeof(tmp)) != NULL) {
38,296,304✔
199
      *id = uid;
13,001,193✔
200
      ret = true;
13,001,193✔
201
    }
202
  } else {
203
    ret = uidInTableList(sStreamReaderInfo, suid, uid, id);
11,552,654✔
204
  }
205

206
end:
49,842,437✔
207
  taosRUnLockLatch(&sStreamReaderInfo->lock);
49,842,437✔
208
  return ret;
49,846,861✔
209
}
210

211
static int32_t  qTransformStreamTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, void* pTableListInfo, StreamTableListInfo* tableInfo){
266,537✔
212
  SArray* pList = qStreamGetTableListArray(pTableListInfo);
266,537✔
213
  int32_t totalSize = taosArrayGetSize(pList);
266,537✔
214
  int32_t code = 0;
266,537✔
215
  void* pTask = sStreamReaderInfo->pTask;
266,537✔
216
  for (int32_t i = 0; i < totalSize; ++i) {
703,915✔
217
    STableKeyInfo* info = taosArrayGet(pList, i);
437,378✔
218
    if (info == NULL) {
437,378✔
219
      continue;
×
220
    }
221
    code = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheTrigger, sStreamReaderInfo->pExprInfoTriggerTag, sStreamReaderInfo->numOfExprTriggerTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
437,378✔
222
    if (code != 0){
437,378✔
223
      ST_TASK_WLOG("%s cacheTag trigger failed for uid:%" PRId64",code:%d", __func__, info->uid, code);
×
224
      continue;
×
225
    }
226
    code = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheCalc, sStreamReaderInfo->pExprInfoCalcTag, sStreamReaderInfo->numOfExprCalcTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
437,378✔
227
    if (code != 0){
437,378✔
228
      ST_TASK_WLOG("%s cacheTag calc failed for uid:%" PRId64",code:%d", __func__, info->uid, code);
×
229
      continue;
×
230
    }
231
    code = qStreamSetTableList(tableInfo, info->uid, info->groupId);
437,378✔
232
    if (code != 0){
437,378✔
233
      return code;
×
234
    }
235
  }
236
  return 0;
266,537✔
237
}
238

239
static int32_t generateTablistForStreamReader(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo) {
266,211✔
240
  int32_t                   code = 0;
266,211✔
241
  int32_t                   lino = 0;
266,211✔
242
  SNodeList* groupNew = NULL;   
266,211✔
243
  void* pTableListInfo = NULL;
266,518✔
244

245
  
246
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
266,518✔
247

248
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType, groupNew,
266,114✔
249
                                         true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &sStreamReaderInfo->storageApi, 
250
                                         &pTableListInfo, sStreamReaderInfo->groupIdMap));
251
  
252
  STREAM_CHECK_RET_GOTO(qTransformStreamTableList(sStreamReaderInfo, pTableListInfo, &sStreamReaderInfo->tableList));
266,537✔
253
  
254
  void* pTask = sStreamReaderInfo->pTask;
266,537✔
255
  ST_TASK_DLOG("vgId:%d %s tablelist size:%" PRIzu, TD_VID(pVnode), __func__, taosArrayGetSize(sStreamReaderInfo->tableList.pTableList));
266,537✔
256
end:
266,079✔
257
  nodesDestroyList(groupNew);
266,537✔
258
  qStreamDestroyTableList(pTableListInfo);
266,537✔
259
  STREAM_PRINT_LOG_END(code, lino);
266,537✔
260
  return code;
266,537✔
261
}
262

263
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
66,540✔
264
  int32_t code = 0;
66,540✔
265
  int32_t lino = 0;
66,540✔
266
  void*   buf = NULL;
66,540✔
267
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
66,540✔
268
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
66,540✔
269
  buf = rpcMallocCont(len);
66,540✔
270
  STREAM_CHECK_NULL_GOTO(buf, terrno);
66,540✔
271
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
66,540✔
272
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
66,540✔
273
  *data = buf;
66,540✔
274
  *size = len;
66,540✔
275
  buf = NULL;
66,540✔
276
end:
66,540✔
277
  rpcFreeCont(buf);
66,540✔
278
  return code;
66,540✔
279
}
280

281
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
465,480✔
282
  int32_t code = 0;
465,480✔
283
  int32_t lino = 0;
465,480✔
284
  void*   buf = NULL;
465,480✔
285
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
465,480✔
286
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
464,796✔
287
  buf = rpcMallocCont(len);
464,796✔
288
  STREAM_CHECK_NULL_GOTO(buf, terrno);
464,784✔
289
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
464,784✔
290
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
465,713✔
291
  *data = buf;
465,713✔
292
  *size = len;
465,713✔
293
  buf = NULL;
464,787✔
294
end:
464,787✔
295
  rpcFreeCont(buf);
464,787✔
296
  return code;
465,023✔
297
}
298

299

300
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
7,820,922✔
301
  int32_t code = 0;
7,820,922✔
302
  int32_t lino = 0;
7,820,922✔
303
  void*   buf = NULL;
7,820,922✔
304
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
7,820,922✔
305
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
1,654,977✔
306
  buf = rpcMallocCont(dataEncodeSize);
1,655,210✔
307
  STREAM_CHECK_NULL_GOTO(buf, terrno);
1,655,210✔
308
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
1,655,210✔
309
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
1,655,022✔
310
  *data = buf;
1,655,022✔
311
  *size = dataEncodeSize;
1,655,022✔
312
  buf = NULL;
1,655,022✔
313
end:
7,823,194✔
314
  rpcFreeCont(buf);
7,823,194✔
315
  return code;
7,821,097✔
316
}
317

318
static int32_t buildArrayRsp(SArray* pBlockList, void** data, size_t* size) {
55,570✔
319
  int32_t code = 0;
55,570✔
320
  int32_t lino = 0;
55,570✔
321

322
  void*   buf = NULL;
55,570✔
323

324
  int32_t blockNum = 0;
55,570✔
325
  size_t  dataEncodeBufSize = 0;
55,570✔
326
  for(size_t i = 0; i < taosArrayGetSize(pBlockList); i++){
116,312✔
327
    SSDataBlock* pBlock = taosArrayGetP(pBlockList, i);
60,742✔
328
    if (pBlock == NULL || pBlock->info.rows == 0) continue;
60,742✔
329
    int32_t blockSize = blockGetEncodeSize(pBlock);
60,742✔
330
    dataEncodeBufSize += blockSize;
60,742✔
331
    blockNum++;
60,742✔
332
  }
333
  buf = rpcMallocCont(INT_BYTES + dataEncodeBufSize);
55,570✔
334
  STREAM_CHECK_NULL_GOTO(buf, terrno);
55,570✔
335

336
  char* dataBuf = (char*)buf;
55,570✔
337
  *((int32_t*)(dataBuf)) = blockNum;
55,570✔
338
  dataBuf += INT_BYTES;
55,570✔
339
  for(size_t i = 0; i < taosArrayGetSize(pBlockList); i++){
116,079✔
340
    SSDataBlock* pBlock = taosArrayGetP(pBlockList, i);
60,742✔
341
    if (pBlock == NULL || pBlock->info.rows == 0) continue;
60,742✔
342
    int32_t actualLen = blockEncode(pBlock, dataBuf, dataEncodeBufSize, taosArrayGetSize(pBlock->pDataBlock));
60,742✔
343
    STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
60,509✔
344
    dataBuf += actualLen;
60,509✔
345
  }
346
  *data = buf;
55,337✔
347
  *size = INT_BYTES + dataEncodeBufSize;
55,337✔
348
  buf = NULL;
55,570✔
349
end:
55,570✔
350
  rpcFreeCont(buf);
55,570✔
351
  return code;
55,337✔
352
}
353

354
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t id, bool isVTable, int64_t uid,
×
355
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
356
  int32_t code = 0;
×
357
  int32_t lino = 0;
×
358
  int32_t index = 0;
×
359
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
×
360
  if (!isVTable) {
×
361
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
×
362
  }
363
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
×
364
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
×
365
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
×
366
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
×
367
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
×
368

369
end:
×
370
  // STREAM_PRINT_LOG_END(code, lino)
371
  return code;
×
372
}
373

374
static int32_t buildWalMetaBlockNew(SSDataBlock* pBlock, int64_t id, int64_t skey, int64_t ekey, int64_t ver) {
8,807,590✔
375
  int32_t code = 0;
8,807,590✔
376
  int32_t lino = 0;
8,807,590✔
377
  int32_t index = 0;
8,807,590✔
378
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
8,807,590✔
379
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
8,808,586✔
380
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
8,809,447✔
381
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
8,810,140✔
382

383
end:
8,810,375✔
384
  return code;
8,810,375✔
385
}
386

387
static int32_t buildTableBlock(SSDataBlock* pBlock, int64_t id, int64_t ver, ETableBlockType type) {
1,986✔
388
  int32_t code = 0;
1,986✔
389
  int32_t lino = 0;
1,986✔
390
  int32_t index = 0;
1,986✔
391
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
1,986✔
392
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
1,986✔
393
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
1,986✔
394

395
end:
1,986✔
396
  return code;
1,986✔
397
}
398

399
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
×
400
  pTSchema->numOfCols = 1;
×
401
  pTSchema->version = ver;
×
402
  pTSchema->columns[0].colId = colId;
×
403
  pTSchema->columns[0].type = type;
×
404
  pTSchema->columns[0].bytes = bytes;
×
405
}
×
406

407
static int32_t scanDeleteDataNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
29,296✔
408
                              int64_t ver) {
409
  int32_t    code = 0;
29,296✔
410
  int32_t    lino = 0;
29,296✔
411
  SDecoder   decoder = {0};
29,296✔
412
  SDeleteRes req = {0};
29,296✔
413
  void* pTask = sStreamReaderInfo->pTask;
29,296✔
414

415
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
29,296✔
416
  tDecoderInit(&decoder, data, len);
29,296✔
417
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
29,296✔
418
  STREAM_CHECK_CONDITION_GOTO((sStreamReaderInfo->tableType == TSDB_SUPER_TABLE && !sStreamReaderInfo->isVtableStream && req.suid != sStreamReaderInfo->suid), TDB_CODE_SUCCESS);
29,296✔
419
  
420
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
45,078✔
421
    uint64_t* uid = taosArrayGet(req.uidList, i);
25,864✔
422
    STREAM_CHECK_NULL_GOTO(uid, terrno);
25,864✔
423
    uint64_t   id = 0;
25,864✔
424
    ST_TASK_DLOG("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, *uid, req.skey, req.ekey);
25,864✔
425
    STREAM_CHECK_CONDITION_GOTO(!uidInTableListSet(sStreamReaderInfo, req.suid, *uid, &id, false), TDB_CODE_SUCCESS);
25,864✔
426
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->deleteBlock, ((SSDataBlock*)rsp->deleteBlock)->info.rows + 1));
19,214✔
427
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->deleteBlock, id, req.skey, req.ekey, ver));
19,214✔
428
    ((SSDataBlock*)rsp->deleteBlock)->info.rows++;
19,214✔
429
    rsp->totalRows++;
19,214✔
430
  }
431

432
end:
29,296✔
433
  taosArrayDestroy(req.uidList);
29,296✔
434
  tDecoderClear(&decoder);
29,296✔
435
  return code;
29,296✔
436
}
437

438
static int32_t createBlockForProcessMeta(SSDataBlock** pBlock) {
1,526✔
439
  int32_t code = 0;
1,526✔
440
  int32_t lino = 0;
1,526✔
441
  SArray* schemas = NULL;
1,526✔
442

443
  schemas = taosArrayInit(8, sizeof(SSchema));
1,526✔
444
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,526✔
445

446
  int32_t index = 0;
1,526✔
447
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
1,526✔
448
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
1,526✔
449
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
1,526✔
450

451
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
1,526✔
452

453
end:
1,526✔
454
  taosArrayDestroy(schemas);
1,526✔
455
  return code;
1,526✔
456
}
457

458
static int32_t addOneRow(void** tmp, int64_t id, int64_t ver, ETableBlockType type) {
1,986✔
459
  int32_t  code = 0;
1,986✔
460
  int32_t  lino = 0;
1,986✔
461
  if (*tmp == NULL) {
1,986✔
462
    STREAM_CHECK_RET_GOTO(createBlockForProcessMeta((SSDataBlock**)tmp));
1,526✔
463
  }
464
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(*tmp, ((SSDataBlock*)(*tmp))->info.rows + 1));
1,986✔
465
  STREAM_CHECK_RET_GOTO(buildTableBlock(*tmp, id, ver, type));
1,986✔
466
  ((SSDataBlock*)(*tmp))->info.rows++;
1,986✔
467
  
468
end:
1,986✔
469
  return code;
1,986✔
470
}
471

472
static int32_t addUidListToBlock(SArray* uidListAdd, void** block, int64_t ver, int32_t* totalRows, ETableBlockType type) {
86,050✔
473
  for (int32_t i = 0; i < taosArrayGetSize(uidListAdd); ++i) {
88,036✔
474
    uint64_t* uid = taosArrayGet(uidListAdd, i);
1,986✔
475
    if (uid == NULL) {
1,986✔
476
      continue;
×
477
    }
478
    int32_t code = addOneRow(block, *uid, ver, type);
1,986✔
479
    if (code != 0) {
1,986✔
480
      return code;
×
481
    }
482
    (*totalRows)++;
1,986✔
483
  }
484
  return 0;
86,050✔
485
}
486

487
static int32_t qStreamGetAddTable(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListAdd, SArray* uidListAdd) {
24,334✔
488
  int32_t      code = 0;
24,334✔
489
  int32_t      lino = 0;
24,334✔
490
  if (uidListAdd == NULL) {
24,334✔
491
    return 0;
19,666✔
492
  }
493
  void* pTask = sStreamReaderInfo->pTask;
4,668✔
494
  
495
  taosRLockLatch(&sStreamReaderInfo->lock);
4,668✔
496
  int32_t totalSize = taosArrayGetSize(tableListAdd);
4,668✔
497
  for (int32_t i = 0; i < totalSize; ++i) {
5,330✔
498
    STableKeyInfo* info = taosArrayGet(tableListAdd, i);
662✔
499
    if (info == NULL) {
662✔
500
      continue;
×
501
    }
502
    if (taosHashGet(sStreamReaderInfo->tableList.uIdMap, &info->uid, LONG_BYTES) != NULL) {
662✔
503
      continue;
×
504
    }
505
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListAdd, &info->uid), terrno);
1,324✔
506
    ST_TASK_WLOG("%s real add table to list for uid:%" PRId64, __func__, info->uid);
662✔
507
  }
508

509
end:
4,668✔
510
  taosRUnLockLatch(&sStreamReaderInfo->lock);
4,668✔
511
  return code;
4,668✔
512
}
513

514
static int32_t qStreamGetDelTable(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListDel, SArray* uidListDel) {
30,628✔
515
  int32_t      code = 0;
30,628✔
516
  int32_t      lino = 0;
30,628✔
517
  if (uidListDel == NULL) {
30,628✔
518
    return 0;
×
519
  }
520
  void* pTask = sStreamReaderInfo->pTask;
30,628✔
521
  
522
  taosRLockLatch(&sStreamReaderInfo->lock);
30,628✔
523
  int32_t totalSize = taosArrayGetSize(tableListDel);
30,628✔
524
  for (int32_t i = 0; i < totalSize; ++i) {
31,492✔
525
    int64_t* uid = taosArrayGet(tableListDel, i);
864✔
526
    if (uid == NULL) {
864✔
527
      continue;
×
528
    }
529
    if (taosHashGet(sStreamReaderInfo->tableList.uIdMap, uid, LONG_BYTES) == NULL) {
864✔
530
      continue;
×
531
    }
532
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListDel, uid), terrno);
864✔
533
    ST_TASK_WLOG("%s real del table from list for uid:%" PRId64, __func__, *uid);
864✔
534
  }
535

536
end:
30,628✔
537
  taosRUnLockLatch(&sStreamReaderInfo->lock);
30,628✔
538
  return code;
30,628✔
539
}
540

541
static int32_t scanDropTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
30,628✔
542
                             int64_t ver) {
543
  int32_t  code = 0;
30,628✔
544
  int32_t  lino = 0;
30,628✔
545
  SDecoder decoder = {0};
30,628✔
546
  void* pTask = sStreamReaderInfo->pTask;
30,628✔
547
  SArray* uidList = NULL;
30,628✔
548
  SArray* uidListDel = NULL;
30,628✔
549
  SArray* uidListDelOutTbl = NULL;
30,628✔
550
  SVDropTbBatchReq req = {0};
30,628✔
551
  tDecoderInit(&decoder, data, len);
30,628✔
552
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
30,628✔
553

554
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
61,256✔
555
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
30,628✔
556
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
30,628✔
557
    uint64_t id = 0;
30,628✔
558
    if(!uidInTableListOrigin(sStreamReaderInfo, pDropTbReq->suid, pDropTbReq->uid, &id)) {
30,628✔
559
      continue;
29,764✔
560
    }
561

562
    if (sStreamReaderInfo->deleteOutTbl != 0) {
864✔
563
      if (uidListDelOutTbl == NULL) {
×
564
        uidListDelOutTbl = taosArrayInit(8, sizeof(tb_uid_t));
×
565
        STREAM_CHECK_NULL_GOTO(uidListDelOutTbl, terrno);
×
566
      }
567
      STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListDelOutTbl, &pDropTbReq->uid), terrno);
×
568
    }
569
    if (sStreamReaderInfo->isVtableStream) {
864✔
570
      if (uidList == NULL) {
864✔
571
        uidList = taosArrayInit(8, sizeof(tb_uid_t));
864✔
572
        STREAM_CHECK_NULL_GOTO(uidList, terrno);
864✔
573
      }
574
      STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &pDropTbReq->uid), terrno);
1,728✔
575
    }
576
    
577
    ST_TASK_DLOG("stream reader scan drop uid %" PRId64 ", id %" PRIu64, pDropTbReq->uid, id);
864✔
578
  }
579
  STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListDelOutTbl, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_DROP));
30,628✔
580

581
  if (sStreamReaderInfo->isVtableStream) {
30,628✔
582
    uidListDel = taosArrayInit(8, sizeof(tb_uid_t));
30,628✔
583
    STREAM_CHECK_NULL_GOTO(uidListDel, terrno);
30,628✔
584
    STREAM_CHECK_RET_GOTO(qStreamGetDelTable(sStreamReaderInfo, uidList, uidListDel));
30,628✔
585
    STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListDel, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_RETIRE));
30,628✔
586
  }
587
  
588
end:
30,628✔
589
  taosArrayDestroy(uidList);
30,628✔
590
  taosArrayDestroy(uidListDel);
30,628✔
591
  taosArrayDestroy(uidListDelOutTbl);
30,628✔
592
  tDecoderClear(&decoder);
30,628✔
593
  return code;
30,628✔
594
}
595

596
static int32_t qStreamModifyTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListAdd, SArray* tableListDel) {
25,760✔
597
  int32_t      code = 0;
25,760✔
598
  int32_t      lino = 0;
25,760✔
599
  void* pTask = sStreamReaderInfo->pTask;
25,760✔
600
  
601
  taosWLockLatch(&sStreamReaderInfo->lock);
25,760✔
602
  int32_t totalSize = taosArrayGetSize(tableListDel);
25,760✔
603
  for (int32_t i = 0; i < totalSize; ++i) {
25,760✔
604
    int64_t* uid = taosArrayGet(tableListDel, i);
×
605
    if (uid == NULL) {
×
606
      continue;
×
607
    }
608
    STREAM_CHECK_RET_GOTO(qStreamRemoveTableList(&sStreamReaderInfo->tableList, *uid));
×
609
  }
610

611
  totalSize = taosArrayGetSize(tableListAdd);
25,760✔
612
  for (int32_t i = 0; i < totalSize; ++i) {
34,996✔
613
    STableKeyInfo* info = taosArrayGet(tableListAdd, i);
9,236✔
614
    if (info == NULL) {
9,236✔
615
      continue;
×
616
    }
617
    int ret = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheTrigger, sStreamReaderInfo->pExprInfoTriggerTag, sStreamReaderInfo->numOfExprTriggerTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
9,236✔
618
    if (ret != 0){
9,236✔
619
      ST_TASK_WLOG("%s cacheTag trigger failed for uid:%" PRId64",code:%d", __func__, info->uid, ret);
1,955✔
620
      continue;
1,955✔
621
    }
622
    ret = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheCalc, sStreamReaderInfo->pExprInfoCalcTag, sStreamReaderInfo->numOfExprCalcTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
7,281✔
623
    if (ret != 0){
7,281✔
624
      ST_TASK_WLOG("%s cacheTag calc failed for uid:%" PRId64",code:%d", __func__, info->uid, ret);
×
625
      continue;
×
626
    }
627
    STREAM_CHECK_RET_GOTO(qStreamRemoveTableList(&sStreamReaderInfo->tableList, info->uid));
7,281✔
628
    STREAM_CHECK_RET_GOTO(qStreamSetTableList(&sStreamReaderInfo->tableList, info->uid, info->groupId));
7,281✔
629
  }
630

631
end:
25,760✔
632
  taosWUnLockLatch(&sStreamReaderInfo->lock);
25,760✔
633
  return code;
25,760✔
634
}
635

636
static int32_t processTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* uidList, SArray** tableList) {
25,760✔
637
  int32_t code = 0;
25,760✔
638
  int32_t lino = 0;
25,760✔
639
  SNodeList* groupNew = NULL;   
25,760✔
640

641
  if (taosArrayGetSize(uidList) == 0) {
25,760✔
642
    return 0;
16,524✔
643
  }
644
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));  
9,236✔
645
  STREAM_CHECK_RET_GOTO(qStreamFilterTableListForReader(sStreamReaderInfo->pVnode, uidList, groupNew, sStreamReaderInfo->pTagCond,
9,236✔
646
                                                    sStreamReaderInfo->pTagIndexCond, &sStreamReaderInfo->storageApi,
647
                                                    sStreamReaderInfo->groupIdMap, sStreamReaderInfo->suid, tableList));
648

649
end:
9,236✔
650
  nodesDestroyList(groupNew);
9,236✔
651
  return code;
9,236✔
652
}
653

654
static int32_t scanCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
24,334✔
655
                             int64_t ver) {
656
  int32_t  code = 0;
24,334✔
657
  int32_t  lino = 0;
24,334✔
658
  SDecoder decoder = {0};
24,334✔
659
  SArray*  uidList = NULL;
24,334✔
660
  SArray*  tableList = NULL;
24,334✔
661
  SArray*  uidListAdd = NULL;
24,334✔
662
  void* pTask = sStreamReaderInfo->pTask;
24,334✔
663

664
  SVCreateTbBatchReq req = {0};
24,334✔
665
  tDecoderInit(&decoder, data, len);
24,334✔
666
  
667
  STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbBatchReq(&decoder, &req));
24,334✔
668

669
  uidList = taosArrayInit(8, sizeof(tb_uid_t));
24,334✔
670
  STREAM_CHECK_NULL_GOTO(uidList, terrno);
24,334✔
671

672
  if (sStreamReaderInfo->isVtableStream) {
24,334✔
673
    uidListAdd = taosArrayInit(8, sizeof(tb_uid_t));
4,668✔
674
    STREAM_CHECK_NULL_GOTO(uidListAdd, terrno);
4,668✔
675
  }
676
  
677
  SVCreateTbReq* pCreateReq = NULL;
24,334✔
678
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
48,668✔
679
    pCreateReq = req.pReqs + iReq;
24,334✔
680
    if (!needReLoadTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
24,334✔
681
      ST_TASK_DLOG("stream reader scan create table jump, %s", pCreateReq->name);
16,524✔
682
      continue;
16,524✔
683
    }
684
    ST_TASK_ILOG("stream reader scan create table %s", pCreateReq->name);
7,810✔
685
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &pCreateReq->uid), terrno);
15,620✔
686
  }
687
  
688
  STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo, uidList, &tableList));
24,334✔
689
  STREAM_CHECK_RET_GOTO(qStreamGetAddTable(sStreamReaderInfo, tableList, uidListAdd));
24,334✔
690
  STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
24,334✔
691

692
  STREAM_CHECK_RET_GOTO(qStreamModifyTableList(sStreamReaderInfo, tableList, uidList));
24,334✔
693
end:
24,334✔
694
  taosArrayDestroy(uidList);
24,334✔
695
  taosArrayDestroy(uidListAdd);
24,334✔
696
  taosArrayDestroy(tableList);
24,334✔
697
  tDeleteSVCreateTbBatchReq(&req);
24,334✔
698
  tDecoderClear(&decoder);
24,334✔
699
  return code;
24,334✔
700
}
701

702
static int32_t processAutoCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SVCreateTbReq* pCreateReq, int64_t ver) {
4,037,581✔
703
  int32_t  code = 0;
4,037,581✔
704
  int32_t  lino = 0;
4,037,581✔
705
  void*    pTask = sStreamReaderInfo->pTask;
4,037,581✔
706
  SArray*  uidList = NULL;
4,041,203✔
707
  SArray*  tableList = NULL;
4,041,203✔
708

709
  ST_TASK_DLOG("%s start, name:%s uid:%"PRId64, __func__, pCreateReq->name, pCreateReq->uid);
4,041,904✔
710
  if (!needReLoadTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false) ||
4,043,860✔
711
      ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
1,956✔
712
    ST_TASK_DLOG("stream reader scan auto create table jump, %s", pCreateReq->name);
4,040,483✔
713
    goto end;
4,040,483✔
714
  }
715
  uidList = taosArrayInit(8, sizeof(tb_uid_t));
1,426✔
716
  STREAM_CHECK_NULL_GOTO(uidList, terrno);
1,426✔
717
  STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &pCreateReq->uid), terrno);
2,852✔
718
  ST_TASK_DLOG("stream reader scan auto create table %s", pCreateReq->name);
1,426✔
719

720
  STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo, uidList, &tableList));
1,426✔
721
  STREAM_CHECK_RET_GOTO(qStreamModifyTableList(sStreamReaderInfo, tableList, uidList));
1,426✔
722
end:
4,039,150✔
723
  taosArrayDestroy(uidList);
4,041,679✔
724
  taosArrayDestroy(tableList);
4,041,001✔
725
  return code;
4,040,998✔
726
}
727

728
static bool isColIdInList(SNodeList* colList, col_id_t cid){
460✔
729
  int32_t  code = 0;
460✔
730
  int32_t  lino = 0;
460✔
731
  SNode*  nodeItem = NULL;
460✔
732
  FOREACH(nodeItem, colList) {
1,150✔
733
    SNode*           pNode = ((STargetNode*)nodeItem)->pExpr;
1,150✔
734
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
1,150✔
735
      SColumnNode*     valueNode = (SColumnNode*)(pNode);
1,150✔
736
      if (cid == valueNode->colId) {
1,150✔
737
        return true;
460✔
738
      }
739
    }
740
  }
741
end:
×
742
  return false;
×
743
}
744

745
static bool isAlteredTable(ETableType tbType) {
5,101✔
746
  return tbType == TSDB_CHILD_TABLE || tbType == TSDB_VIRTUAL_CHILD_TABLE || tbType == TSDB_VIRTUAL_NORMAL_TABLE;
5,101✔
747
}
748

749
void getAlterColId(void* pVnode, int64_t uid, const char* colName, col_id_t* colId) {
460✔
750
  SSchemaWrapper *pSchema = metaGetTableSchema(((SVnode *)pVnode)->pMeta, uid, -1, 1, NULL, 0);
460✔
751
  if (pSchema == NULL) {
460✔
752
    return;
×
753
  }
754
  for (int32_t i = 0; i < pSchema->nCols; i++) {
1,150✔
755
    if (strncmp(pSchema->pSchema[i].name, colName, TSDB_COL_NAME_LEN) == 0) {
1,150✔
756
      *colId = pSchema->pSchema[i].colId;
460✔
757
      break;
460✔
758
    }
759
  }
760
  tDeleteSchemaWrapper(pSchema);
761
  return;
460✔
762
}
763

764
static bool checkAlterCondition() {
×
765
  return true;
×
766
}
767

768
static int32_t scanAlterTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len, int64_t ver) {
34,287✔
769
  int32_t  code = 0;
34,287✔
770
  int32_t  lino = 0;
34,287✔
771
  SDecoder decoder = {0};
34,287✔
772
  void* pTask = sStreamReaderInfo->pTask;
34,287✔
773
  SArray*  uidList = NULL;
34,287✔
774
  SArray*  uidListAdd = NULL;
34,287✔
775
  SArray*  uidListDel = NULL;
34,287✔
776
  SArray*  tableList = NULL;
34,287✔
777

778
  ST_TASK_DLOG("%s start", __func__);
34,287✔
779

780
  SVAlterTbReq req = {0};
34,287✔
781
  tDecoderInit(&decoder, data, len);
34,287✔
782
  
783
  STREAM_CHECK_RET_GOTO(tDecodeSVAlterTbReq(&decoder, &req));
34,287✔
784

785

786
  // TODO:
787
  // 1. TSDB_ALTER_TABLE_UPDATE_TAG_VAL and TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL is not used any more. 
788
  // 2. TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL and TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL are
789
  //    added, both support updating tag value for multiple tables.
790

791

792
  STREAM_CHECK_CONDITION_GOTO(req.action != TSDB_ALTER_TABLE_UPDATE_TAG_VAL && req.action != TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL && 
33,888✔
793
    req.action != TSDB_ALTER_TABLE_ALTER_COLUMN_REF && req.action != TSDB_ALTER_TABLE_REMOVE_COLUMN_REF, TDB_CODE_SUCCESS);
794

795
  ETableType tbType = 0;
5,101✔
796
  uint64_t suid = 0;
5,101✔
797
  code = metaGetTableTypeSuidByName(sStreamReaderInfo->pVnode, req.tbName, &tbType, &suid);
5,101✔
798
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
5,101✔
799
    code = 0;
×
800
    ST_TASK_WLOG("stream reader scan alter table %s not exist, metaGetTableTypeSuidByName", req.tbName);
×
801
    goto end;
×
802
  }
803
  STREAM_CHECK_CONDITION_GOTO(!isAlteredTable(tbType), TDB_CODE_SUCCESS);
5,101✔
804
  STREAM_CHECK_CONDITION_GOTO(suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
5,101✔
805

806
  if (sStreamReaderInfo->isVtableStream) {
2,007✔
807
    uidListAdd = taosArrayInit(8, sizeof(tb_uid_t));
460✔
808
    STREAM_CHECK_NULL_GOTO(uidListAdd, terrno);
460✔
809
  }
810
  uint64_t uid = 0;
2,007✔
811
  code = metaGetTableUidByName(sStreamReaderInfo->pVnode, req.tbName, &uid);
2,007✔
812
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
2,007✔
813
    code = 0;
×
814
    ST_TASK_WLOG("stream reader scan alter table %s not exist, metaGetTableUidByName", req.tbName);
×
815
    goto end;
×
816
  }
817
  if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF || req.action == TSDB_ALTER_TABLE_REMOVE_COLUMN_REF) {
2,467✔
818
    uint64_t id = 0;
2,007✔
819
    STREAM_CHECK_CONDITION_GOTO(!uidInTableListOrigin(sStreamReaderInfo, suid, uid, &id), TDB_CODE_SUCCESS);
2,007✔
820
    col_id_t colId = 0;
460✔
821
    getAlterColId(sStreamReaderInfo->pVnode, uid, req.colName, &colId);
460✔
822
    if (atomic_load_8(&sStreamReaderInfo->isVtableOnlyTs) == 0 && !isColIdInList(sStreamReaderInfo->triggerCols, colId)) {    //todo calc cols
460✔
823
      ST_TASK_ILOG("stream reader scan alter table %s, colId %d not in trigger cols", req.tbName, colId);
×
824
      goto end;
×
825
    }
826
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListAdd, &uid), terrno);
460✔
827
    STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
460✔
828
  } else {
829
    uidList = taosArrayInit(8, sizeof(tb_uid_t));
×
830
    STREAM_CHECK_NULL_GOTO(uidList, terrno);
×
831
    
832
    uidListDel = taosArrayInit(8, sizeof(tb_uid_t));
×
833
    STREAM_CHECK_NULL_GOTO(uidListDel, terrno);
×
834
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &uid), terrno);
×
835
    STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo, uidList, &tableList));
×
836
    STREAM_CHECK_RET_GOTO(qStreamGetDelTable(sStreamReaderInfo, uidList, uidListDel));
×
837
    if (rsp->checkAlter && taosArrayGetSize(uidListDel) > 0 && rsp->totalDataRows > 0) {
×
838
      rsp->needReturn = true;
×
839
      rsp->ver--;
×
840
      ST_TASK_DLOG("stream reader scan alter table %s need return data again for uid %" PRId64, req.tbName, uid);
×
841
      goto end;
×
842
    }
843
    STREAM_CHECK_RET_GOTO(qStreamGetAddTable(sStreamReaderInfo, tableList, uidListAdd));
×
844
    if (sStreamReaderInfo->isVtableStream) {
×
845
      STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
×
846
      STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListDel, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_RETIRE));
×
847
    }
848
    STREAM_CHECK_RET_GOTO(qStreamModifyTableList(sStreamReaderInfo, tableList, uidList));
×
849
  }
850
  ST_TASK_DLOG("stream reader scan alter table %s", req.tbName);
460✔
851

852
end:
33,888✔
853
  taosArrayDestroy(uidList);
33,888✔
854
  taosArrayDestroy(uidListAdd);
33,888✔
855
  taosArrayDestroy(uidListDel);
34,057✔
856
  taosArrayDestroy(tableList);
34,057✔
857
  taosArrayDestroy(req.pMultiTag);
34,057✔
858
  for (int32_t i = 0; i < taosArrayGetSize(req.tables); i++) {
53,583✔
859
    SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
19,526✔
860
    taosArrayDestroy(pTable->tags);
19,526✔
861
  }
862
  taosArrayDestroy(req.tables);
34,057✔
863
  tDecoderClear(&decoder);
34,057✔
864
  STREAM_PRINT_LOG_END_WITHID(code, lino);
34,287✔
865
  return code;
34,287✔
866
}
867

868
// static int32_t scanAlterSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
869
//   int32_t  code = 0;
870
//   int32_t  lino = 0;
871
//   SDecoder decoder = {0};
872
//   SMAlterStbReq reqAlter = {0};
873
//   SVCreateStbReq req = {0};
874
//   tDecoderInit(&decoder, data, len);
875
//   void* pTask = sStreamReaderInfo->pTask;
876
  
877
//   STREAM_CHECK_RET_GOTO(tDecodeSVCreateStbReq(&decoder, &req));
878
//   STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
879
//   if (req.alterOriData != 0) {
880
//     STREAM_CHECK_RET_GOTO(tDeserializeSMAlterStbReq(req.alterOriData, req.alterOriDataLen, &reqAlter));
881
//     STREAM_CHECK_CONDITION_GOTO(reqAlter.alterType != TSDB_ALTER_TABLE_DROP_TAG && reqAlter.alterType != TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TDB_CODE_SUCCESS);
882
//   }
883
  
884
//   STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo));
885

886
//   ST_TASK_DLOG("stream reader scan alter suid %" PRId64, req.suid);
887
// end:
888
//   tFreeSMAltertbReq(&reqAlter);
889
//   tDecoderClear(&decoder);
890
//   return code;
891
// }
892

893
// static int32_t scanDropSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
894
//   int32_t  code = 0;
895
//   int32_t  lino = 0;
896
//   SDecoder decoder = {0};
897
//   void* pTask = sStreamReaderInfo->pTask;
898

899
//   SVDropStbReq req = {0};
900
//   tDecoderInit(&decoder, data, len);
901
//   STREAM_CHECK_RET_GOTO(tDecodeSVDropStbReq(&decoder, &req));
902
//   STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
903

904
//   ST_TASK_DLOG("stream reader scan drop suid %" PRId64, req.suid);
905
// end:
906
//   tDecoderClear(&decoder);
907
//   return code;
908
// }
909

910
static int32_t scanSubmitTbDataForMeta(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* gidHash, int64_t ver) {
35,130,218✔
911
  int32_t code = 0;
35,130,218✔
912
  int32_t lino = 0;
35,130,218✔
913
  WalMetaResult walMeta = {0};
35,130,218✔
914
  SSubmitTbData submitTbData = {0};
35,130,880✔
915
  
916
  if (tStartDecode(pCoder) < 0) {
35,131,297✔
917
    code = TSDB_CODE_INVALID_MSG;
×
918
    TSDB_CHECK_CODE(code, lino, end);
×
919
  }
920

921
  uint8_t       version = 0;
35,132,747✔
922
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
35,130,503✔
923
    code = TSDB_CODE_INVALID_MSG;
×
924
    TSDB_CHECK_CODE(code, lino, end);
×
925
  }
926
  version = (submitTbData.flags >> 8) & 0xff;
35,130,503✔
927
  submitTbData.flags = submitTbData.flags & 0xff;
35,130,503✔
928

929
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
930
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
35,130,503✔
931
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
3,269,519✔
932
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
3,268,158✔
933
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
3,268,158✔
934
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq, ver));
3,269,519✔
935
  }
936

937
  // submit data
938
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
35,123,343✔
939
    code = TSDB_CODE_INVALID_MSG;
×
940
    TSDB_CHECK_CODE(code, lino, end);
×
941
  }
942
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
35,130,067✔
943
    code = TSDB_CODE_INVALID_MSG;
×
944
    TSDB_CHECK_CODE(code, lino, end);
×
945
  }
946

947
  if (!uidInTableListSet(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &walMeta.id, false)){
35,130,067✔
948
    goto end;
28,053,990✔
949
  }
950
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
7,077,372✔
951
    code = TSDB_CODE_INVALID_MSG;
×
952
    TSDB_CHECK_CODE(code, lino, end);
×
953
  }
954

955
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
7,077,372✔
956
    uint64_t nColData = 0;
×
957
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
958
      code = TSDB_CODE_INVALID_MSG;
×
959
      TSDB_CHECK_CODE(code, lino, end);
×
960
    }
961

962
    SColData colData = {0};
×
963
    code = tDecodeColData(version, pCoder, &colData, false);
×
964
    if (code) {
×
965
      code = TSDB_CODE_INVALID_MSG;
×
966
      TSDB_CHECK_CODE(code, lino, end);
×
967
    }
968

969
    if (colData.flag != HAS_VALUE) {
×
970
      code = TSDB_CODE_INVALID_MSG;
×
971
      TSDB_CHECK_CODE(code, lino, end);
×
972
    }
973
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
974
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
975

976
    for (uint64_t i = 1; i < nColData; i++) {
×
977
      code = tDecodeColData(version, pCoder, &colData, true);
×
978
      if (code) {
×
979
        code = TSDB_CODE_INVALID_MSG;
×
980
        TSDB_CHECK_CODE(code, lino, end);
×
981
      }
982
    }
983
  } else {
984
    uint64_t nRow = 0;
7,077,372✔
985
    if (tDecodeU64v(pCoder, &nRow) < 0) {
7,077,372✔
986
      code = TSDB_CODE_INVALID_MSG;
×
987
      TSDB_CHECK_CODE(code, lino, end);
×
988
    }
989

990
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
15,222,666✔
991
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
8,145,727✔
992
      pCoder->pos += pRow->len;
8,145,927✔
993
      if (iRow == 0){
8,145,032✔
994
#ifndef NO_UNALIGNED_ACCESS
995
        walMeta.skey = pRow->ts;
7,077,372✔
996
#else
997
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
998
#endif
999
      }
1000
      if (iRow == nRow - 1) {
8,144,802✔
1001
#ifndef NO_UNALIGNED_ACCESS
1002
        walMeta.ekey = pRow->ts;
7,077,372✔
1003
#else
1004
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
1005
#endif
1006
      }
1007
    }
1008
  }
1009

1010
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
7,076,906✔
1011
  if (data != NULL) {
7,077,141✔
1012
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
940✔
1013
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
940✔
1014
  } else {
1015
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
7,076,201✔
1016
  }
1017

1018
end:
35,125,316✔
1019
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
35,129,048✔
1020
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
35,127,789✔
1021
  tEndDecode(pCoder);
35,128,016✔
1022
  return code;
35,129,176✔
1023
}
1024

1025
static int32_t scanSubmitDataForMeta(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len, int64_t ver) {
35,130,615✔
1026
  int32_t  code = 0;
35,130,615✔
1027
  int32_t  lino = 0;
35,130,615✔
1028
  SDecoder decoder = {0};
35,130,615✔
1029
  SSHashObj* gidHash = NULL;
35,131,129✔
1030
  void* pTask = sStreamReaderInfo->pTask;
35,131,129✔
1031

1032
  tDecoderInit(&decoder, data, len);
35,132,260✔
1033
  if (tStartDecode(&decoder) < 0) {
35,130,206✔
1034
    code = TSDB_CODE_INVALID_MSG;
×
1035
    TSDB_CHECK_CODE(code, lino, end);
×
1036
  }
1037

1038
  uint64_t nSubmitTbData = 0;
35,129,796✔
1039
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
35,125,979✔
1040
    code = TSDB_CODE_INVALID_MSG;
×
1041
    TSDB_CHECK_CODE(code, lino, end);
×
1042
  }
1043

1044
  gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
35,125,979✔
1045
  STREAM_CHECK_NULL_GOTO(gidHash, terrno);
35,123,553✔
1046

1047
  for (uint64_t i = 0; i < nSubmitTbData; i++) {
70,255,187✔
1048
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataForMeta(&decoder, sStreamReaderInfo, gidHash, ver));
35,124,934✔
1049
  }
1050
  tEndDecode(&decoder);
35,130,253✔
1051

1052
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
35,129,127✔
1053
  int32_t iter = 0;
35,124,183✔
1054
  void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
35,124,701✔
1055
  while (px != NULL) {
42,200,689✔
1056
    WalMetaResult* pMeta = (WalMetaResult*)px;
7,076,202✔
1057
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
7,076,202✔
1058
    ((SSDataBlock*)rsp->metaBlock)->info.rows++;
7,077,327✔
1059
    rsp->totalRows++;
7,077,327✔
1060
    ST_TASK_DLOG("stream reader scan submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
7,076,665✔
1061
          ", ver:%"PRId64, pMeta->skey, pMeta->ekey, pMeta->id, ver);
1062
    px = tSimpleHashIterate(gidHash, px, &iter);
7,077,327✔
1063
  }
1064
end:
35,124,487✔
1065
  tDecoderClear(&decoder);
35,126,492✔
1066
  tSimpleHashCleanup( gidHash);
35,129,327✔
1067
  return code;
35,117,031✔
1068
}
1069

1070
static int32_t createBlockForTsdbMeta(SSDataBlock** pBlock, bool isVTable) {
510,530✔
1071
  int32_t code = 0;
510,530✔
1072
  int32_t lino = 0;
510,530✔
1073
  SArray* schemas = taosArrayInit(8, sizeof(SSchema));
510,530✔
1074
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
510,530✔
1075

1076
  int32_t index = 1;
510,530✔
1077
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
510,530✔
1078
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
510,530✔
1079
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
510,530✔
1080
  if (!isVTable) {
510,530✔
1081
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
44,111✔
1082
  }
1083
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
510,530✔
1084

1085
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
510,530✔
1086

1087
end:
510,530✔
1088
  taosArrayDestroy(schemas);
510,530✔
1089
  return code;
510,530✔
1090
}
1091

1092
static int32_t createBlockForWalMetaNew(SSDataBlock** pBlock) {
276,977✔
1093
  int32_t code = 0;
276,977✔
1094
  int32_t lino = 0;
276,977✔
1095
  SArray* schemas = NULL;
276,977✔
1096

1097
  schemas = taosArrayInit(8, sizeof(SSchema));
276,977✔
1098
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
276,977✔
1099

1100
  int32_t index = 0;
276,977✔
1101
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
276,977✔
1102
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
276,977✔
1103
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
276,977✔
1104
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
276,977✔
1105

1106
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
276,977✔
1107

1108
end:
276,977✔
1109
  taosArrayDestroy(schemas);
276,977✔
1110
  return code;
276,977✔
1111
}
1112

1113
static int32_t processMeta(int16_t msgType, SStreamTriggerReaderInfo* sStreamReaderInfo, void *data, int32_t len, SSTriggerWalNewRsp* rsp, int64_t ver) {
465,963✔
1114
  int32_t code = 0;
465,963✔
1115
  int32_t lino = 0;
465,963✔
1116
  void* pTask = sStreamReaderInfo->pTask;
465,963✔
1117

1118
  ST_TASK_DLOG("%s check meta msg, stream ver:%" PRId64 ", wal ver:%" PRId64, __func__, sStreamReaderInfo->tableList.version, ver);
465,963✔
1119

1120
  SDecoder dcoder = {0};
465,963✔
1121
  tDecoderInit(&dcoder, data, len);
465,963✔
1122
  if (msgType == TDMT_VND_DELETE && sStreamReaderInfo->deleteReCalc != 0) {
465,963✔
1123
    if (rsp->deleteBlock == NULL) {
29,296✔
1124
      STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&rsp->deleteBlock));
10,440✔
1125
    }
1126
      
1127
    STREAM_CHECK_RET_GOTO(scanDeleteDataNew(sStreamReaderInfo, rsp, data, len, ver));
29,296✔
1128
  } else if (msgType == TDMT_VND_DROP_TABLE && 
436,667✔
1129
    (sStreamReaderInfo->deleteOutTbl != 0 || sStreamReaderInfo->isVtableStream)) {
47,302✔
1130
    STREAM_CHECK_RET_GOTO(scanDropTableNew(sStreamReaderInfo, rsp, data, len, ver));
30,628✔
1131
  // } else if (msgType == TDMT_VND_DROP_STB) {
1132
  //   STREAM_CHECK_RET_GOTO(scanDropSTableNew(sStreamReaderInfo, data, len));
1133
  } else if (msgType == TDMT_VND_CREATE_TABLE && !ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
406,039✔
1134
    STREAM_CHECK_RET_GOTO(scanCreateTableNew(sStreamReaderInfo, rsp, data, len, ver));
24,334✔
1135
  } else if (msgType == TDMT_VND_ALTER_STB && !ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
381,705✔
1136
    // STREAM_CHECK_RET_GOTO(scanAlterSTableNew(sStreamReaderInfo, data, len));
1137
  } else if (msgType == TDMT_VND_ALTER_TABLE && !ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
356,770✔
1138
    STREAM_CHECK_RET_GOTO(scanAlterTableNew(sStreamReaderInfo, rsp, data, len, ver));
34,287✔
1139
  }
1140

1141
end:
465,963✔
1142
  tDecoderClear(&dcoder);
465,733✔
1143
  return code;
465,733✔
1144
}
1145
static int32_t processWalVerMetaNew(SVnode* pVnode, SSTriggerWalNewRsp* rsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
11,374,689✔
1146
                       int64_t ctime) {
1147
  int32_t code = 0;
11,374,689✔
1148
  int32_t lino = 0;
11,374,689✔
1149
  void* pTask = sStreamReaderInfo->pTask;
11,374,689✔
1150

1151
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
11,375,149✔
1152
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
11,374,064✔
1153
  code = walReaderSeekVer(pWalReader, rsp->ver);
11,374,064✔
1154
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
11,372,165✔
1155
    if (rsp->ver < walGetFirstVer(pWalReader->pWal)) {
6,503,285✔
1156
      rsp->ver = walGetFirstVer(pWalReader->pWal);
×
1157
      rsp->verTime = 0;
×
1158
    } else {
1159
      rsp->verTime = taosGetTimestampUs();
6,503,358✔
1160
    }
1161
    ST_TASK_DLOG("vgId:%d %s scan wal end:%s", TD_VID(pVnode), __func__, tstrerror(code));
6,504,444✔
1162
    code = TSDB_CODE_SUCCESS;
6,506,011✔
1163
    goto end;
6,506,011✔
1164
  }
1165
  STREAM_CHECK_RET_GOTO(code);
4,868,880✔
1166

1167
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, STREAM_RETURN_ROWS_NUM));
4,868,880✔
1168
  while (1) {
35,482,746✔
1169
    code = walNextValidMsg(pWalReader, true);
40,350,477✔
1170
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
40,347,082✔
1171
      rsp->verTime = taosGetTimestampUs();
4,869,125✔
1172
      ST_TASK_DLOG("vgId:%d %s scan wal end:%s", TD_VID(pVnode), __func__, tstrerror(code));
4,869,125✔
1173
      code = TSDB_CODE_SUCCESS;
4,869,125✔
1174
      goto end;
4,869,125✔
1175
    }
1176
    STREAM_CHECK_RET_GOTO(code);
35,477,279✔
1177
    rsp->ver = pWalReader->curVersion;
35,477,279✔
1178
    SWalCont* wCont = &pWalReader->pHead->head;
35,485,138✔
1179
    rsp->verTime = wCont->ingestTs;
35,482,630✔
1180
    if (wCont->ingestTs / 1000 > ctime) break;
35,483,108✔
1181
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
35,483,604✔
1182
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
35,482,847✔
1183
    int64_t ver = wCont->version;
35,481,993✔
1184

1185
    ST_TASK_DLOG("vgId:%d stream reader scan wal ver:%" PRId64 "/%" PRId64 ", type:%s, deleteData:%d, deleteTb:%d",
35,484,332✔
1186
      TD_VID(pVnode), ver, walGetAppliedVer(pWalReader->pWal), TMSG_INFO(wCont->msgType), sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
1187
    if (wCont->msgType == TDMT_VND_SUBMIT) {
35,477,561✔
1188
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
35,131,592✔
1189
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
35,131,825✔
1190
      STREAM_CHECK_RET_GOTO(scanSubmitDataForMeta(sStreamReaderInfo, rsp, data, len, ver));
35,131,592✔
1191
    } else {
1192
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, rsp, ver));
355,500✔
1193
    }
1194

1195
    if (rsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
35,479,189✔
1196
      break;
×
1197
    }
1198
  }
1199

1200
end:
11,375,136✔
1201
  walCloseReader(pWalReader);
11,375,136✔
1202
  return code;
11,373,750✔
1203
}
1204

1205
int32_t cacheTag(SVnode* pVnode, SHashObj* metaCache, SExprInfo* pExprInfo, int32_t numOfExpr, SStorageAPI* api, uint64_t uid, col_id_t colId, SRWLatch* lock) {
41,001,294✔
1206
  int32_t     code = 0;
41,001,294✔
1207
  int32_t     lino = 0;
41,001,294✔
1208
  SMetaReader mr = {0};
41,001,294✔
1209
  SArray* tagCache = NULL;
41,004,762✔
1210
  char* data = NULL;
41,008,169✔
1211

1212
  if (lock != NULL) taosWLockLatch(lock);
41,008,324✔
1213
  STREAM_CHECK_CONDITION_GOTO(numOfExpr == 0, code);
41,013,839✔
1214
  stDebug("%s start,uid:%"PRIu64, __func__, uid);
2,183,795✔
1215
  void* uidData = taosHashGet(metaCache, &uid, LONG_BYTES);
2,183,795✔
1216
  if (uidData == NULL) {
2,183,795✔
1217
    tagCache = taosArrayInit(numOfExpr, POINTER_BYTES);
2,183,121✔
1218
    STREAM_CHECK_NULL_GOTO(tagCache, terrno);
2,183,121✔
1219
    if(taosHashPut(metaCache, &uid, LONG_BYTES, &tagCache, POINTER_BYTES) != 0) {
2,183,121✔
1220
      taosArrayDestroy(tagCache);
×
1221
      code = terrno;
×
1222
      goto end;
×
1223
    }
1224
  } else {
1225
    tagCache = *(SArray**)uidData;
674✔
1226
    stDebug("%s found tagCache, size:%zu %d, uid:%"PRIu64, __func__, taosArrayGetSize(tagCache), numOfExpr, uid);
674✔
1227
    STREAM_CHECK_CONDITION_GOTO(taosArrayGetSize(tagCache) != numOfExpr, TSDB_CODE_INVALID_PARA);
674✔
1228
  }
1229
  
1230
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
2,183,795✔
1231
  code = api->metaReaderFn.getEntryGetUidCache(&mr, uid);
2,183,795✔
1232
  api->metaReaderFn.readerReleaseLock(&mr);
2,183,436✔
1233
  STREAM_CHECK_RET_GOTO(code);
2,183,436✔
1234
  
1235
  for (int32_t j = 0; j < numOfExpr; ++j) {
7,677,991✔
1236
    const SExprInfo* pExpr1 = &pExprInfo[j];
5,495,949✔
1237
    int32_t functionId = pExpr1->pExpr->_function.functionId;
5,494,824✔
1238
    col_id_t cid = 0;
5,495,735✔
1239
    // this is to handle the tbname
1240
    if (fmIsScanPseudoColumnFunc(functionId)) {
5,495,735✔
1241
      int32_t fType = pExpr1->pExpr->_function.functionType;
542,099✔
1242
      if (fType == FUNCTION_TYPE_TBNAME) {
542,099✔
1243
        data = taosMemoryCalloc(1, strlen(mr.me.name) + VARSTR_HEADER_SIZE);
542,099✔
1244
        STREAM_CHECK_NULL_GOTO(data, terrno);
542,099✔
1245
        STR_TO_VARSTR(data, mr.me.name)
542,099✔
1246
      }
1247
      cid = -1;
542,099✔
1248
    } else {  // these are tags
1249
      const char* p = NULL;
4,953,531✔
1250
      char* pData = NULL;
4,953,531✔
1251
      int8_t type = pExpr1->base.resSchema.type;
4,953,531✔
1252
      int32_t len = pExpr1->base.resSchema.bytes;
4,954,206✔
1253
      STagVal tagVal = {0};
4,954,190✔
1254
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
4,953,815✔
1255
      cid = tagVal.cid;
4,954,190✔
1256
      if (colId != 0 && cid != colId) {
4,954,190✔
1257
        continue;
1,011✔
1258
      }
1259
      p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, type, &tagVal);
4,953,179✔
1260

1261
      if (type != TSDB_DATA_TYPE_JSON && p != NULL) {
4,950,961✔
1262
        pData = tTagValToData((const STagVal*)p, false);
4,947,581✔
1263
      } else {
1264
        pData = (char*)p;
3,380✔
1265
      }
1266

1267
      if (pData != NULL && (type == TSDB_DATA_TYPE_JSON || !IS_VAR_DATA_TYPE(type))) {
4,951,695✔
1268
        if (type == TSDB_DATA_TYPE_JSON) {
2,468,394✔
1269
          len = getJsonValueLen(pData);
×
1270
        }
1271
        data = taosMemoryCalloc(1, len);
2,468,394✔
1272
        STREAM_CHECK_NULL_GOTO(data, terrno);
2,469,301✔
1273
        (void)memcpy(data, pData, len);
2,469,301✔
1274
      } else {
1275
        data = pData;
2,483,301✔
1276
      }
1277
    }
1278
    if (uidData == NULL){
5,493,752✔
1279
      STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &data), terrno);
10,988,202✔
1280
    } else {
1281
      void* pre = taosArrayGetP(tagCache, j);
337✔
1282
      taosMemoryFree(pre);
337✔
1283
      taosArraySet(tagCache, j, &data);
337✔
1284
    }
1285
    data = NULL;
5,495,499✔
1286
  }
1287

1288
end:
41,013,827✔
1289
  taosMemoryFree(data);
41,012,764✔
1290
  api->metaReaderFn.clearReader(&mr);
41,006,020✔
1291
  if (lock != NULL) taosWUnLockLatch(lock);
41,008,922✔
1292
  return code;
41,011,453✔
1293
}
1294

1295
int32_t fillTag(SHashObj* metaCache, SExprInfo* pExprInfo, int32_t numOfExpr,
105,079,465✔
1296
                uint64_t uid, SSDataBlock* pBlock, uint32_t currentRow, uint32_t numOfRows, uint32_t numOfBlocks, SRWLatch* lock) {
1297
  int32_t     code = 0;
105,079,465✔
1298
  int32_t     lino = 0;
105,079,465✔
1299
  SArray* tagCache = NULL;
105,079,465✔
1300
  if (numOfExpr == 0) {
105,079,465✔
1301
    return TSDB_CODE_SUCCESS;
38,389,879✔
1302
  }
1303

1304
  taosRLockLatch(lock);
66,689,586✔
1305
  void* uidData = taosHashGet(metaCache, &uid, LONG_BYTES);
66,749,765✔
1306
  if (uidData == NULL) {
66,714,848✔
1307
    stError("%s error uidData is null,uid:%"PRIu64, __func__, uid);
×
1308
  } else {
1309
    tagCache = *(SArray**)uidData;
66,714,848✔
1310
    if(taosArrayGetSize(tagCache) != numOfExpr) {
66,728,147✔
1311
      stError("%s numOfExpr:%d,tagCache size:%zu", __func__, numOfExpr, taosArrayGetSize(tagCache));
×
1312
      tagCache = NULL;
×
1313
    }
1314
  }
1315
  
1316
  for (int32_t j = 0; j < numOfExpr; ++j) {
281,179,927✔
1317
    const SExprInfo* pExpr1 = &pExprInfo[j];
214,464,530✔
1318
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
214,436,597✔
1319

1320
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
214,453,047✔
1321
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
214,376,474✔
1322
    int32_t functionId = pExpr1->pExpr->_function.functionId;
214,376,474✔
1323

1324
    // this is to handle the tbname
1325
    if (fmIsScanPseudoColumnFunc(functionId)) {
214,429,106✔
1326
      int32_t fType = pExpr1->pExpr->_function.functionType;
3,405,234✔
1327
      if (fType == FUNCTION_TYPE_TBNAME) {
3,405,464✔
1328
        pColInfoData->info.colId = -1;
3,405,464✔
1329
      }
1330
    } 
1331
    char* data = tagCache == NULL ? NULL : taosArrayGetP(tagCache, j);
214,330,436✔
1332

1333
    bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
214,288,711✔
1334
    if (isNullVal) {
214,315,971✔
1335
      colDataSetNNULL(pColInfoData, currentRow, numOfRows);
×
1336
    } else {
1337
      if (!IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
214,315,971✔
1338
        for (uint32_t i = 0; i < numOfRows; i++){
2,147,483,647✔
1339
          colDataClearNull_f(pColInfoData->nullbitmap, currentRow + i);
2,147,483,647✔
1340
        }
1341
      }
1342
      code = colDataSetNItems(pColInfoData, currentRow, (const char*)data, numOfRows, numOfBlocks, false);
213,868,679✔
1343
      STREAM_CHECK_RET_GOTO(code);
214,437,061✔
1344
    }
1345
  }
1346
end:
66,715,397✔
1347
  taosRUnLockLatch(lock);
66,715,397✔
1348
  return code;
66,749,639✔
1349
}
1350

1351
static int32_t processTag(SStreamTriggerReaderInfo* info, bool isCalc, 
3,300,672✔
1352
  uint64_t uid, SSDataBlock* pBlock, uint32_t currentRow, uint32_t numOfRows, uint32_t numOfBlocks) {
1353
  int32_t     code = 0;
3,300,672✔
1354
  int32_t     lino = 0;
3,300,672✔
1355

1356
  void* pTask = info->pTask;
3,300,672✔
1357
  ST_TASK_DLOG("%s start. rows:%" PRIu32 ",uid:%"PRIu64, __func__,  numOfRows, uid);
3,300,907✔
1358
  
1359
  SHashObj* metaCache = isCalc ? info->pTableMetaCacheCalc : info->pTableMetaCacheTrigger;
3,300,907✔
1360
  SExprInfo*   pExprInfo = isCalc ? info->pExprInfoCalcTag : info->pExprInfoTriggerTag; 
3,300,677✔
1361
  int32_t      numOfExpr = isCalc ? info->numOfExprCalcTag : info->numOfExprTriggerTag;
3,300,447✔
1362
  
1363
  code = fillTag(metaCache, pExprInfo, numOfExpr, uid, pBlock, currentRow, numOfRows, numOfBlocks, &info->lock);
3,300,677✔
1364
  STREAM_CHECK_RET_GOTO(code);
3,300,439✔
1365

1366
end:
3,300,439✔
1367
  return code;
3,300,439✔
1368
}
1369

1370
int32_t getRowRange(SColData* pCol, STimeWindow* window, int32_t* rowStart, int32_t* rowEnd, int32_t* nRows) {
×
1371
  int32_t code = 0;
×
1372
  int32_t lino = 0;
×
1373
  *nRows = 0;
×
1374
  *rowStart = 0;
×
1375
  *rowEnd = pCol->nVal;
×
1376
  if (window != NULL) {
×
1377
    SColVal colVal = {0};
×
1378
    *rowStart = -1;
×
1379
    *rowEnd = -1;
×
1380
    for (int32_t k = 0; k < pCol->nVal; k++) {
×
1381
      STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
1382
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
1383
      if (ts >= window->skey && *rowStart == -1) {
×
1384
        *rowStart = k;
×
1385
      }
1386
      if (ts > window->ekey && *rowEnd == -1) {
×
1387
        *rowEnd = k;
×
1388
      }
1389
    }
1390
    STREAM_CHECK_CONDITION_GOTO(*rowStart == -1 || *rowStart == *rowEnd, TDB_CODE_SUCCESS);
×
1391

1392
    if (*rowStart != -1 && *rowEnd == -1) {
×
1393
      *rowEnd = pCol->nVal;
×
1394
    }
1395
  }
1396
  *nRows = *rowEnd - *rowStart;
×
1397

1398
end:
×
1399
  return code;
×
1400
}
1401

1402
static int32_t setColData(int64_t rows, int32_t rowStart, int32_t rowEnd, SColData* colData, SColumnInfoData* pColData) {
×
1403
  int32_t code = 0;
×
1404
  int32_t lino = 0;
×
1405
  for (int32_t k = rowStart; k < rowEnd; k++) {
×
1406
    SColVal colVal = {0};
×
1407
    STREAM_CHECK_RET_GOTO(tColDataGetValue(colData, k, &colVal));
×
1408
    STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, rows + k - rowStart, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
1409
                                        !COL_VAL_IS_VALUE(&colVal)));
1410
  }
1411
  end:
×
1412
  return code;
×
1413
}
1414

1415
static int32_t getColId(int64_t suid, int64_t uid, int16_t i, SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, int16_t* colId) {
19,214,615✔
1416
  int32_t code = 0;
19,214,615✔
1417
  int32_t lino = 0;
19,214,615✔
1418
  int64_t id[2] = {suid, uid};
19,214,615✔
1419
  taosRLockLatch(&sStreamReaderInfo->lock);
19,217,496✔
1420
  void *px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
19,217,266✔
1421
  STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
19,217,067✔
1422
  SSHashObj* uInfo = *(SSHashObj **)px;
19,217,067✔
1423
  STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
19,217,067✔
1424
  int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
19,217,067✔
1425
  if (tmp != NULL) {
19,213,453✔
1426
    *colId = *tmp;
17,612,779✔
1427
  } else {
1428
    *colId = -1;
1,600,674✔
1429
  }
1430

1431
end:
19,215,277✔
1432
  taosRUnLockLatch(&sStreamReaderInfo->lock);
19,214,149✔
1433
  return code;
19,217,729✔
1434
}
1435

1436
static int32_t getSchemas(SVnode* pVnode, int64_t suid, int64_t uid, int32_t sver, SStreamTriggerReaderInfo* sStreamReaderInfo, STSchema** schema) {
9,359,925✔
1437
  int32_t code = 0;
9,359,925✔
1438
  int32_t lino = 0;
9,359,925✔
1439
  int64_t id = suid != 0 ? suid : uid;
9,359,925✔
1440
  if (sStreamReaderInfo->isVtableStream) {
9,359,925✔
1441
    STSchema** schemaTmp = taosHashGet(sStreamReaderInfo->triggerTableSchemaMapVTable, &id, LONG_BYTES);
6,184,533✔
1442
    if (schemaTmp == NULL || *schemaTmp == NULL || (*schemaTmp)->version != sver) {
6,184,533✔
1443
      *schema = metaGetTbTSchema(pVnode->pMeta, id, sver, 1);
57,283✔
1444
      STREAM_CHECK_NULL_GOTO(*schema, terrno);
57,516✔
1445
      code = taosHashPut(sStreamReaderInfo->triggerTableSchemaMapVTable, &id, LONG_BYTES, schema, POINTER_BYTES);
57,516✔
1446
      if (code != 0) {
57,516✔
1447
        taosMemoryFree(*schema);
×
1448
        goto end;
×
1449
      }
1450
    } else {
1451
      *schema = *schemaTmp;
6,127,250✔
1452
    }
1453
  } else {
1454
    if (sStreamReaderInfo->triggerTableSchema == NULL || sStreamReaderInfo->triggerTableSchema->version != sver) {
3,175,588✔
1455
      taosMemoryFree(sStreamReaderInfo->triggerTableSchema);
80,752✔
1456
      sStreamReaderInfo->triggerTableSchema = metaGetTbTSchema(pVnode->pMeta, id, sver, 1);
80,752✔
1457
      STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerTableSchema, terrno);
80,752✔
1458
    }
1459
    *schema = sStreamReaderInfo->triggerTableSchema;
3,175,358✔
1460
  }
1461
  
1462
end:
9,360,826✔
1463
  return code;
9,360,596✔
1464
}
1465

1466
static int32_t scanSubmitTbData(SVnode* pVnode, SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, 
10,306,614✔
1467
  SSHashObj* ranges, SSHashObj* gidHash, SSTriggerWalNewRsp* rsp, int64_t ver) {
1468
  int32_t code = 0;
10,306,614✔
1469
  int32_t lino = 0;
10,306,614✔
1470
  uint64_t id = 0;
10,306,614✔
1471
  WalMetaResult walMeta = {0};
10,306,844✔
1472
  void* pTask = sStreamReaderInfo->pTask;
10,307,322✔
1473
  SSDataBlock * pBlock = (SSDataBlock*)rsp->dataBlock;
10,306,832✔
1474

1475
  if (tStartDecode(pCoder) < 0) {
10,306,832✔
1476
    ST_TASK_ELOG("vgId:%d %s invalid submit data", TD_VID(pVnode), __func__);
×
1477
    code = TSDB_CODE_INVALID_MSG;
×
1478
    TSDB_CHECK_CODE(code, lino, end);
×
1479
  }
1480

1481
  SSubmitTbData submitTbData = {0};
10,307,560✔
1482
  uint8_t       version = 0;
10,305,569✔
1483
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
10,307,790✔
1484
    ST_TASK_ELOG("vgId:%d %s invalid submit data flags", TD_VID(pVnode), __func__);
×
1485
    code = TSDB_CODE_INVALID_MSG;
×
1486
    TSDB_CHECK_CODE(code, lino, end);
×
1487
  }
1488
  version = (submitTbData.flags >> 8) & 0xff;
10,307,790✔
1489
  submitTbData.flags = submitTbData.flags & 0xff;
10,307,790✔
1490
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1491
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
10,307,790✔
1492
    if (tStartDecode(pCoder) < 0) {
54,283✔
1493
      ST_TASK_ELOG("vgId:%d %s invalid auto create table data", TD_VID(pVnode), __func__);
×
1494
      code = TSDB_CODE_INVALID_MSG;
×
1495
      TSDB_CHECK_CODE(code, lino, end);
×
1496
    }
1497
    tEndDecode(pCoder);
54,283✔
1498
  }
1499

1500
  // submit data
1501
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
10,307,353✔
1502
    ST_TASK_ELOG("vgId:%d %s invalid submit data suid", TD_VID(pVnode), __func__);
×
1503
    code = TSDB_CODE_INVALID_MSG;
×
1504
    TSDB_CHECK_CODE(code, lino, end);
×
1505
  }
1506
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
10,306,433✔
1507
    ST_TASK_ELOG("vgId:%d %s invalid submit data uid", TD_VID(pVnode), __func__);
×
1508
    code = TSDB_CODE_INVALID_MSG;
×
1509
    TSDB_CHECK_CODE(code, lino, end);
×
1510
  }
1511

1512
  ST_TASK_DLOG("%s uid:%" PRId64 ", suid:%" PRId64 ", ver:%" PRId64, __func__, submitTbData.uid, submitTbData.suid, ver);
10,306,433✔
1513

1514
  if (rsp->uidHash != NULL) {
10,306,668✔
1515
    uint64_t* gid = tSimpleHashGet(rsp->uidHash, &submitTbData.uid, LONG_BYTES);
7,646,517✔
1516
    STREAM_CHECK_CONDITION_GOTO(gid == NULL, TDB_CODE_SUCCESS);
7,646,517✔
1517
    ST_TASK_DLOG("%s get uid gid from uidHash, uid:%" PRId64 ", suid:%" PRId64 " gid:%"PRIu64, __func__, submitTbData.uid, submitTbData.suid, *gid);
7,646,517✔
1518
    id = *gid;
7,645,855✔
1519
  } else {
1520
    STREAM_CHECK_CONDITION_GOTO(!uidInTableListSet(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &id, rsp->isCalc), TDB_CODE_SUCCESS);
2,661,043✔
1521
  }
1522

1523
  walMeta.id = id;
9,360,385✔
1524
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
9,360,385✔
1525

1526
  if (ranges != NULL){
9,361,047✔
1527
    void* timerange = tSimpleHashGet(ranges, &id, sizeof(id));
7,645,855✔
1528
    if (timerange == NULL) goto end;;
7,646,517✔
1529
    int64_t* pRange = (int64_t*)timerange;
7,646,517✔
1530
    window.skey = pRange[0];
7,646,517✔
1531
    window.ekey = pRange[1];
7,646,517✔
1532
    ST_TASK_DLOG("%s get time range from ranges, uid:%" PRId64 ", suid:%" PRId64 ", gid:%" PRIu64 ", skey:%" PRId64 ", ekey:%" PRId64,
7,646,517✔
1533
      __func__, submitTbData.uid, submitTbData.suid, id, window.skey, window.ekey);
1534
  }
1535
  
1536
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
9,360,354✔
1537
    ST_TASK_ELOG("vgId:%d %s invalid submit data sver", TD_VID(pVnode), __func__);
×
1538
    code = TSDB_CODE_INVALID_MSG;
×
1539
    TSDB_CHECK_CODE(code, lino, end);
×
1540
  }
1541

1542
  STSchema*    schema = NULL;
9,360,354✔
1543
  STREAM_CHECK_RET_GOTO(getSchemas(pVnode, submitTbData.suid, submitTbData.uid, submitTbData.sver, sStreamReaderInfo, &schema));
9,359,462✔
1544

1545
  SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(rsp->indexHash, &submitTbData.uid, LONG_BYTES);
9,360,826✔
1546
  int32_t blockStart = 0;
9,361,047✔
1547
  int32_t numOfRows = 0;
9,361,047✔
1548
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
9,360,814✔
1549
    uint64_t nColData = 0;
×
1550
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1551
      ST_TASK_ELOG("vgId:%d %s invalid submit data nColData", TD_VID(pVnode), __func__);
×
1552
      code = TSDB_CODE_INVALID_MSG;
×
1553
      TSDB_CHECK_CODE(code, lino, end);
×
1554
    }
1555

1556
    SColData colData = {0};
×
1557
    code = tDecodeColData(version, pCoder, &colData, false);
×
1558
    if (code) {
×
1559
      ST_TASK_ELOG("vgId:%d %s invalid submit data colData", TD_VID(pVnode), __func__);
×
1560
      code = TSDB_CODE_INVALID_MSG;
×
1561
      TSDB_CHECK_CODE(code, lino, end);
×
1562
    }
1563

1564
    if (colData.flag != HAS_VALUE) {
×
1565
      ST_TASK_ELOG("vgId:%d %s invalid submit data colData flag", TD_VID(pVnode), __func__);
×
1566
      code = TSDB_CODE_INVALID_MSG;
×
1567
      TSDB_CHECK_CODE(code, lino, end);
×
1568
    }
1569
    
1570
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
1571
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
1572

1573
    int32_t rowStart = 0;
×
1574
    int32_t rowEnd = 0;
×
1575
    STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, &numOfRows));
×
1576
    STREAM_CHECK_CONDITION_GOTO(numOfRows <= 0, TDB_CODE_SUCCESS);
×
1577

1578
    STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
×
1579
    blockStart = pSlice->currentRowIdx;
×
1580
    int32_t pos = pCoder->pos;
×
1581
    for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
1582
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
1583
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
1584
      if (pColData->info.colId <= -1) {
×
1585
        pColData->hasNull = true;
×
1586
        continue;
×
1587
      }
1588
      if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
1589
        STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colData, pColData));
×
1590
        continue;
×
1591
      }
1592

1593
      pCoder->pos = pos;
×
1594

1595
      int16_t colId = 0;
×
1596
      if (sStreamReaderInfo->isVtableStream){
×
1597
        STREAM_CHECK_RET_GOTO(getColId(submitTbData.suid, submitTbData.uid, i, sStreamReaderInfo, rsp, &colId));
×
1598
        ST_TASK_TLOG("%s vtable colId:%d, i:%d, uid:%" PRId64, __func__, colId, i, submitTbData.uid);
×
1599
      } else {
1600
        colId = pColData->info.colId;
×
1601
      }
1602
      
1603
      uint64_t j = 1;
×
1604
      for (; j < nColData; j++) {
×
1605
        int16_t cid = 0;
×
1606
        int32_t posTmp = pCoder->pos;
×
1607
        pCoder->pos += INT_BYTES;
×
1608
        if ((code = tDecodeI16v(pCoder, &cid))) return code;
×
1609
        pCoder->pos = posTmp;
×
1610
        if (cid == colId) {
×
1611
          SColData colDataTmp = {0};
×
1612
          code = tDecodeColData(version, pCoder, &colDataTmp, false);
×
1613
          if (code) {
×
1614
            code = TSDB_CODE_INVALID_MSG;
×
1615
            TSDB_CHECK_CODE(code, lino, end);
×
1616
          }
1617
          STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colDataTmp, pColData));
×
1618
          break;
×
1619
        }
1620
        code = tDecodeColData(version, pCoder, &colData, true);
×
1621
        if (code) {
×
1622
          code = TSDB_CODE_INVALID_MSG;
×
1623
          TSDB_CHECK_CODE(code, lino, end);
×
1624
        }
1625
      }
1626
      if (j == nColData) {
×
1627
        colDataSetNNULL(pColData, blockStart, numOfRows);
×
1628
      }
1629
    }
1630
  } else {
1631
    uint64_t nRow = 0;
9,360,814✔
1632
    if (tDecodeU64v(pCoder, &nRow) < 0) {
9,360,814✔
1633
      code = TSDB_CODE_INVALID_MSG;
×
1634
      TSDB_CHECK_CODE(code, lino, end);
×
1635
    }
1636
    for (uint64_t iRow = 0; iRow < nRow; ++iRow) {
19,966,748✔
1637
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
10,604,879✔
1638
      pCoder->pos += pRow->len;
10,606,203✔
1639

1640
      if (iRow == 0){
10,606,436✔
1641
#ifndef NO_UNALIGNED_ACCESS
1642
        walMeta.skey = pRow->ts;
9,359,925✔
1643
#else
1644
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
1645
#endif
1646
      }
1647
      if (iRow == nRow - 1) {
10,605,311✔
1648
#ifndef NO_UNALIGNED_ACCESS
1649
        walMeta.ekey = pRow->ts;
9,359,048✔
1650
#else
1651
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
1652
#endif
1653
      }
1654

1655
      if (pRow->ts < window.skey || pRow->ts > window.ekey) {
10,606,415✔
1656
        continue;
7,738✔
1657
      }
1658
      STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
10,597,107✔
1659
      blockStart = pSlice->currentRowIdx;
10,597,107✔
1660
     
1661
      for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
65,972,852✔
1662
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
55,370,220✔
1663
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
55,368,294✔
1664
        if (pColData->info.colId <= -1) {
55,368,294✔
1665
          pColData->hasNull = true;
18,876,362✔
1666
          continue;
18,876,362✔
1667
        }
1668
        int16_t colId = 0;
36,502,021✔
1669
        if (sStreamReaderInfo->isVtableStream){
36,502,484✔
1670
          STREAM_CHECK_RET_GOTO(getColId(submitTbData.suid, submitTbData.uid, i, sStreamReaderInfo, rsp, &colId));
19,213,058✔
1671
          ST_TASK_TLOG("%s vtable colId:%d, i:%d, uid:%" PRId64, __func__, colId, i, submitTbData.uid);
19,217,729✔
1672
        } else {
1673
          colId = pColData->info.colId;
17,289,628✔
1674
        }
1675
        
1676
        SColVal colVal = {0};
36,507,821✔
1677
        int32_t sourceIdx = 0;
36,506,497✔
1678
        while (1) {
1679
          if (sourceIdx >= schema->numOfCols) {
99,337,167✔
1680
            break;
8,536,828✔
1681
          }
1682
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schema, sourceIdx, &colVal));
90,793,971✔
1683
          if (colVal.cid == colId) {
90,801,555✔
1684
            break;
27,970,885✔
1685
          }
1686
          sourceIdx++;
62,830,670✔
1687
        }
1688
        if (colVal.cid == colId && COL_VAL_IS_VALUE(&colVal)) {
36,507,713✔
1689
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
25,837,079✔
1690
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, blockStart+ numOfRows, (const char*)colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
25,830✔
1691
            ST_TASK_TLOG("%s vtable colId:%d, i:%d, colData:%p, data:%s, len:%d, rowIndex:%d, offset:%d, uid:%" PRId64, __func__, colId, i, pColData, 
26,154✔
1692
              (const char*)colVal.value.pData, colVal.value.nData, blockStart+ numOfRows, pColData->varmeta.offset[blockStart+ numOfRows], submitTbData.uid);
1693
          } else {
1694
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, blockStart + numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
25,811,785✔
1695
          }
1696
        } else {
1697
          colDataSetNULL(pColData, blockStart + numOfRows);
10,670,634✔
1698
        }
1699
      }
1700
      
1701
      numOfRows++;
10,598,196✔
1702
    }
1703
  }
1704

1705
  if (numOfRows > 0) {
9,359,648✔
1706
    if (!sStreamReaderInfo->isVtableStream) {
9,359,648✔
1707
      STREAM_CHECK_RET_GOTO(processTag(sStreamReaderInfo, rsp->isCalc, submitTbData.uid, pBlock, blockStart, numOfRows, 1));
3,175,117✔
1708
    }
1709
    
1710
    SColumnInfoData* pColData = taosArrayGetLast(pBlock->pDataBlock);
9,360,099✔
1711
    STREAM_CHECK_NULL_GOTO(pColData, terrno);
9,359,664✔
1712
    STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, blockStart, (const char*)&ver, numOfRows, 1, false));
9,359,664✔
1713

1714
    STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
9,360,104✔
1715
    ST_TASK_DLOG("%s process submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
9,360,104✔
1716
      ", uid:%" PRId64 ", ver:%"PRId64 ", row index:%d, rows:%d", __func__, window.skey, window.ekey, 
1717
      id, submitTbData.uid, ver, pSlice->currentRowIdx, numOfRows);
1718
    pSlice->currentRowIdx += numOfRows;
9,360,339✔
1719
    pBlock->info.rows += numOfRows;
9,360,354✔
1720
  } else {
1721
    ST_TASK_DLOG("%s no valid data in time range:skey %" PRId64 ", ekey %" PRId64 ", uid:%" PRId64 ", suid:%" PRId64,
×
1722
      __func__, window.skey, window.ekey, submitTbData.uid, submitTbData.suid);
1723
  }
1724
  
1725
  if (gidHash == NULL) goto end;
9,360,354✔
1726

1727
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
1,714,530✔
1728
  if (data != NULL) {
1,714,067✔
1729
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
×
1730
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
×
1731
  } else {
1732
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
1,714,067✔
1733
  }
1734

1735
end:
10,303,976✔
1736
  if (code != 0) {                                                             \
10,307,094✔
1737
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); \
×
1738
  }
1739
  tEndDecode(pCoder);
10,307,094✔
1740
  return code;
10,307,557✔
1741
}
1742
static int32_t scanSubmitData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo,
10,306,608✔
1743
  void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp, int64_t ver) {
1744
  int32_t  code = 0;
10,306,608✔
1745
  int32_t  lino = 0;
10,306,608✔
1746
  SDecoder decoder = {0};
10,306,608✔
1747
  SSHashObj* gidHash = NULL;
10,306,841✔
1748
  void* pTask = sStreamReaderInfo->pTask;
10,306,841✔
1749

1750
  tDecoderInit(&decoder, data, len);
10,307,307✔
1751
  if (tStartDecode(&decoder) < 0) {
10,307,340✔
1752
    code = TSDB_CODE_INVALID_MSG;
×
1753
    TSDB_CHECK_CODE(code, lino, end);
×
1754
  }
1755

1756
  uint64_t nSubmitTbData = 0;
10,307,324✔
1757
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
10,307,324✔
1758
    code = TSDB_CODE_INVALID_MSG;
×
1759
    TSDB_CHECK_CODE(code, lino, end);
×
1760
  }
1761

1762
  if (rsp->metaBlock != NULL){
10,307,324✔
1763
    gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
2,660,574✔
1764
    STREAM_CHECK_NULL_GOTO(gidHash, terrno);
2,659,403✔
1765
  }
1766

1767
  for (uint64_t i = 0; i < nSubmitTbData; i++) {
20,612,381✔
1768
    STREAM_CHECK_RET_GOTO(scanSubmitTbData(pVnode, &decoder, sStreamReaderInfo, ranges, gidHash, rsp, ver));
10,306,148✔
1769
  }
1770

1771
  tEndDecode(&decoder);
10,306,233✔
1772

1773
  if (rsp->metaBlock != NULL){
10,306,429✔
1774
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
2,661,040✔
1775
    int32_t iter = 0;
2,661,273✔
1776
    void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
2,661,273✔
1777
    while (px != NULL) {
4,375,340✔
1778
      WalMetaResult* pMeta = (WalMetaResult*)px;
1,714,297✔
1779
      STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
1,714,297✔
1780
      ((SSDataBlock*)rsp->metaBlock)->info.rows++;
1,714,064✔
1781
      rsp->totalRows++;
1,714,064✔
1782
      ST_TASK_DLOG("%s process meta data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
1,713,834✔
1783
            ", ver:%"PRId64, __func__, pMeta->skey, pMeta->ekey, pMeta->id, ver);
1784
      px = tSimpleHashIterate(gidHash, px, &iter);
1,713,834✔
1785
    }
1786
  }
1787
  
1788

1789
end:
10,302,425✔
1790
  tSimpleHashCleanup(gidHash);
10,307,327✔
1791
  tDecoderClear(&decoder);
10,306,665✔
1792
  return code;
10,304,881✔
1793
}
1794

1795
static int32_t scanSubmitTbDataPre(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* ranges, 
12,025,894✔
1796
  uint64_t* gid, int64_t* uid, int32_t* numOfRows, SSTriggerWalNewRsp* rsp, int64_t ver) {
1797
  int32_t code = 0;
12,025,894✔
1798
  int32_t lino = 0;
12,025,894✔
1799
  void* pTask = sStreamReaderInfo->pTask;
12,025,894✔
1800

1801
  if (tStartDecode(pCoder) < 0) {
12,026,832✔
1802
    code = TSDB_CODE_INVALID_MSG;
×
1803
    TSDB_CHECK_CODE(code, lino, end);
×
1804
  }
1805

1806
  SSubmitTbData submitTbData = {0};
12,029,362✔
1807
  uint8_t       version = 0;
12,028,465✔
1808
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
12,029,325✔
1809
    code = TSDB_CODE_INVALID_MSG;
×
1810
    TSDB_CHECK_CODE(code, lino, end);
×
1811
  }
1812
  version = (submitTbData.flags >> 8) & 0xff;
12,029,325✔
1813
  submitTbData.flags = submitTbData.flags & 0xff;
12,029,325✔
1814

1815
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1816
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
12,029,325✔
1817
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
772,387✔
1818
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
772,154✔
1819
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
772,154✔
1820
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq, ver));
772,152✔
1821
  }
1822

1823
  // submit data
1824
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
12,027,076✔
1825
    code = TSDB_CODE_INVALID_MSG;
×
1826
    TSDB_CHECK_CODE(code, lino, end);
×
1827
  }
1828
  if (tDecodeI64(pCoder, uid) < 0) {
12,027,520✔
1829
    code = TSDB_CODE_INVALID_MSG;
×
1830
    TSDB_CHECK_CODE(code, lino, end);
×
1831
  }
1832
  ST_TASK_DLOG("%s uid:%" PRId64 ", suid:%" PRId64, __func__, *uid, submitTbData.suid);
12,027,520✔
1833
  STREAM_CHECK_CONDITION_GOTO(!uidInTableListSet(sStreamReaderInfo, submitTbData.suid, *uid, gid, rsp->isCalc), TDB_CODE_SUCCESS);
12,027,755✔
1834
  if (rsp->uidHash != NULL) {
9,361,047✔
1835
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(rsp->uidHash, uid, LONG_BYTES, gid, LONG_BYTES));
7,646,517✔
1836
    ST_TASK_DLOG("%s put uid into uidHash, uid:%" PRId64 ", suid:%" PRId64 " gid:%"PRIu64, __func__, *uid, submitTbData.suid, *gid);
7,646,287✔
1837
  }
1838
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
9,360,817✔
1839

1840
  if (ranges != NULL){
9,360,817✔
1841
    void* timerange = tSimpleHashGet(ranges, gid, sizeof(*gid));
7,646,287✔
1842
    if (timerange == NULL) goto end;;
7,645,821✔
1843
    int64_t* pRange = (int64_t*)timerange;
7,645,821✔
1844
    window.skey = pRange[0];
7,645,821✔
1845
    window.ekey = pRange[1];
7,645,821✔
1846
  }
1847
  
1848
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
9,359,925✔
1849
    code = TSDB_CODE_INVALID_MSG;
×
1850
    TSDB_CHECK_CODE(code, lino, end);
×
1851
  }
1852

1853
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
9,359,925✔
1854
    uint64_t nColData = 0;
×
1855
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1856
      code = TSDB_CODE_INVALID_MSG;
×
1857
      TSDB_CHECK_CODE(code, lino, end);
×
1858
    }
1859

1860
    SColData colData = {0};
×
1861
    code = tDecodeColData(version, pCoder, &colData, false);
×
1862
    if (code) {
×
1863
      code = TSDB_CODE_INVALID_MSG;
×
1864
      TSDB_CHECK_CODE(code, lino, end);
×
1865
    }
1866

1867
    if (colData.flag != HAS_VALUE) {
×
1868
      code = TSDB_CODE_INVALID_MSG;
×
1869
      TSDB_CHECK_CODE(code, lino, end);
×
1870
    }
1871
    int32_t rowStart = 0;
×
1872
    int32_t rowEnd = 0;
×
1873
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) {
×
1874
      STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, numOfRows));
×
1875
    } else {
1876
      (*numOfRows) = colData.nVal;
×
1877
    } 
1878
  } else {
1879
    uint64_t nRow = 0;
9,359,925✔
1880
    if (tDecodeU64v(pCoder, &nRow) < 0) {
9,360,357✔
1881
      code = TSDB_CODE_INVALID_MSG;
×
1882
      TSDB_CHECK_CODE(code, lino, end);
×
1883
    }
1884

1885
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) { 
9,360,357✔
1886
      for (uint64_t iRow = 0; iRow < nRow; ++iRow) {
16,335,849✔
1887
        SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
8,689,559✔
1888
        pCoder->pos += pRow->len;
8,689,792✔
1889
        if (pRow->ts < window.skey || pRow->ts > window.ekey) {
8,689,559✔
1890
          continue;
7,738✔
1891
        }
1892
        (*numOfRows)++;
8,682,054✔
1893
      }
1894
    } else {
1895
      (*numOfRows) = nRow;
1,714,300✔
1896
    }
1897
  }
1898
  
1899
end:
12,029,678✔
1900
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
12,028,982✔
1901
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
12,028,866✔
1902
  tEndDecode(pCoder);
12,028,871✔
1903
  return code;
12,029,101✔
1904
}
1905

1906
static int32_t scanSubmitDataPre(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp, int64_t ver) {
12,029,332✔
1907
  int32_t  code = 0;
12,029,332✔
1908
  int32_t  lino = 0;
12,029,332✔
1909
  SDecoder decoder = {0};
12,029,332✔
1910
  void* pTask = sStreamReaderInfo->pTask;
12,029,562✔
1911

1912
  tDecoderInit(&decoder, data, len);
12,029,332✔
1913
  if (tStartDecode(&decoder) < 0) {
12,029,098✔
1914
    code = TSDB_CODE_INVALID_MSG;
×
1915
    TSDB_CHECK_CODE(code, lino, end);
×
1916
  }
1917

1918
  uint64_t nSubmitTbData = 0;
12,029,101✔
1919
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
12,028,206✔
1920
    code = TSDB_CODE_INVALID_MSG;
×
1921
    TSDB_CHECK_CODE(code, lino, end);
×
1922
  }
1923
  ST_TASK_DLOG("%s nSubmitTbData:%" PRIu64 ", ver:%"PRId64 " bodyLen:%d", __func__, nSubmitTbData, ver, len);
12,028,206✔
1924

1925
  for (int32_t i = 0; i < nSubmitTbData; i++) {
24,057,043✔
1926
    uint64_t gid = -1;
12,028,638✔
1927
    int64_t  uid = 0;
12,028,868✔
1928
    int32_t numOfRows = 0;
12,028,868✔
1929
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataPre(&decoder, sStreamReaderInfo, ranges, &gid, &uid, &numOfRows, rsp, ver));
12,028,206✔
1930
    if (numOfRows <= 0) {
12,028,905✔
1931
      ST_TASK_DLOG("%s no valid data uid:%" PRId64 ", gid:%" PRIu64 ", numOfRows:%d, ver:%"PRId64, __func__, uid, gid, numOfRows, ver);
2,669,210✔
1932
      continue;
2,669,210✔
1933
    }
1934
    rsp->totalRows += numOfRows;
9,359,695✔
1935
    rsp->totalDataRows += numOfRows;
9,360,587✔
1936

1937
    SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(rsp->indexHash, &uid, LONG_BYTES);
9,360,587✔
1938
    if (pSlice != NULL) {
9,360,118✔
1939
      pSlice->numRows += numOfRows;
8,724,040✔
1940
      ST_TASK_DLOG("%s again uid:%" PRId64 ", gid:%" PRIu64 ", total numOfRows:%d, hash:%p %d, ver:%"PRId64, __func__, uid, gid, pSlice->numRows, rsp->indexHash, tSimpleHashGetSize(rsp->indexHash), ver);
8,723,795✔
1941
      pSlice->gId = gid;
8,723,795✔
1942
    } else {
1943
      SStreamWalDataSlice tmp = {.gId=gid,.numRows=numOfRows,.currentRowIdx=0,.startRowIdx=0};
636,078✔
1944
      ST_TASK_DLOG("%s first uid:%" PRId64 ", gid:%" PRIu64 ", numOfRows:%d, hash:%p %d, ver:%"PRId64, __func__, uid, gid, tmp.numRows, rsp->indexHash, tSimpleHashGetSize(rsp->indexHash), ver);
636,078✔
1945
      STREAM_CHECK_RET_GOTO(tSimpleHashPut(rsp->indexHash, &uid, LONG_BYTES, &tmp, sizeof(tmp)));
636,078✔
1946
    } 
1947
  }
1948

1949
  tEndDecode(&decoder);
12,028,405✔
1950

1951
end:
12,028,189✔
1952
  tDecoderClear(&decoder);
12,028,189✔
1953
  return code;
12,029,559✔
1954
}
1955

1956
static void buildIndexHash(SSHashObj* indexHash, void* pTask){
519,382✔
1957
  void*   pe = NULL;
519,382✔
1958
  int32_t iter = 0;
519,382✔
1959
  int32_t index = 0;
519,382✔
1960
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
1,155,460✔
1961
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
636,078✔
1962
    pInfo->startRowIdx = index;
636,078✔
1963
    pInfo->currentRowIdx = index;
636,078✔
1964
    index += pInfo->numRows;
636,078✔
1965
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
1,089,729✔
1966
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1967
  }
1968
}
519,382✔
1969

1970
static void printIndexHash(SSHashObj* indexHash, void* pTask){
519,147✔
1971
  if (qDebugFlag & DEBUG_TRACE) {
519,147✔
1972
    void*   pe = NULL;
2,453✔
1973
    int32_t iter = 0;
2,453✔
1974
    while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
5,335✔
1975
      SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
2,882✔
1976
      ST_TASK_TLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
3,105✔
1977
      pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1978
    }
1979
  }
1980
}
519,147✔
1981

1982
static void filterIndexHash(SSHashObj* indexHash, SColumnInfoData* pRet){
11,589✔
1983
  void*   pe = NULL;
11,589✔
1984
  int32_t iter = 0;
11,589✔
1985
  int32_t index = 0;
11,589✔
1986
  int32_t pIndex = 0;
11,589✔
1987
  int8_t* pIndicator = (int8_t*)pRet->pData;
11,589✔
1988
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
24,533✔
1989
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
12,944✔
1990
    pInfo->startRowIdx = index;
12,944✔
1991
    int32_t size = pInfo->numRows;
12,944✔
1992
    for (int32_t i = 0; i < pInfo->numRows; i++) {
186,611✔
1993
      if (pIndicator && !pIndicator[pIndex++]) {
173,667✔
1994
        size--;
58,148✔
1995
      }
1996
    }
1997
    pInfo->numRows = size;
12,944✔
1998
    index += pInfo->numRows;
12,944✔
1999
    stTrace("stream reader re build index hash uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
12,944✔
2000
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
2001
  }
2002
}
11,589✔
2003

2004
static int32_t prepareIndexMetaData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* resultRsp){
3,266,915✔
2005
  int32_t      code = 0;
3,266,915✔
2006
  int32_t      lino = 0;
3,266,915✔
2007
  void* pTask = sStreamReaderInfo->pTask;
3,266,915✔
2008

2009
  code = walReaderSeekVer(pWalReader, resultRsp->ver);
3,267,611✔
2010
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
3,265,513✔
2011
    if (resultRsp->ver < walGetFirstVer(pWalReader->pWal)) {
2,833,703✔
2012
      resultRsp->ver = walGetFirstVer(pWalReader->pWal);
×
2013
      resultRsp->verTime = 0;
×
2014
    } else {
2015
      resultRsp->verTime = taosGetTimestampUs();
2,834,527✔
2016
    }
2017
    ST_TASK_DLOG("%s scan wal end:%s",  __func__, tstrerror(code));
2,833,850✔
2018
    code = TSDB_CODE_SUCCESS;
2,834,530✔
2019
    goto end;
2,834,530✔
2020
  }
2021
  STREAM_CHECK_RET_GOTO(code);
431,810✔
2022

2023
  while (1) {
4,493,970✔
2024
    code = walNextValidMsg(pWalReader, true);
4,925,780✔
2025
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
4,926,007✔
2026
      resultRsp->verTime = taosGetTimestampUs();
432,042✔
2027
      ST_TASK_DLOG("%s scan wal end:%s", __func__, tstrerror(code));
432,042✔
2028
      code = TSDB_CODE_SUCCESS;
432,042✔
2029
      goto end;
432,042✔
2030
    }
2031
    STREAM_CHECK_RET_GOTO(code);
4,493,735✔
2032
    resultRsp->ver = pWalReader->curVersion;
4,493,735✔
2033
    SWalCont* wCont = &pWalReader->pHead->head;
4,494,203✔
2034
    resultRsp->verTime = wCont->ingestTs;
4,493,735✔
2035
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
4,493,502✔
2036
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
4,493,215✔
2037
    int64_t ver = wCont->version;
4,493,349✔
2038
    ST_TASK_DLOG("%s scan wal ver:%" PRId64 ", type:%s, deleteData:%d, deleteTb:%d, msg len:%d", __func__,
4,492,762✔
2039
      ver, TMSG_INFO(wCont->msgType), sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl, len);
2040
    if (wCont->msgType == TDMT_VND_SUBMIT) {
4,494,712✔
2041
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
4,383,280✔
2042
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
4,383,050✔
2043
      STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, data, len, NULL, resultRsp, ver));
4,383,510✔
2044
    } else {
2045
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, resultRsp, ver));
110,463✔
2046
    }
2047

2048
    ST_TASK_DLOG("%s scan wal next ver:%" PRId64 ", totalRows:%d", __func__, resultRsp->ver, resultRsp->totalRows);
4,493,735✔
2049
    if (resultRsp->totalRows >= STREAM_RETURN_ROWS_NUM || resultRsp->needReturn) {
4,493,735✔
2050
      break;
2051
    }
2052
  }
2053
  
2054
end:
31✔
2055
  STREAM_PRINT_LOG_END(code, lino);
3,266,401✔
2056
  return code;
3,266,115✔
2057
}
2058

2059
static int32_t prepareIndexData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, 
8,373,679✔
2060
  SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp){
2061
  int32_t      code = 0;
8,373,679✔
2062
  int32_t      lino = 0;
8,373,679✔
2063
  void* pTask = sStreamReaderInfo->pTask;
8,373,679✔
2064

2065
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
16,020,858✔
2066
    int64_t *ver = taosArrayGet(versions, i);
7,646,517✔
2067
    if (ver == NULL) continue;
7,646,517✔
2068

2069
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
7,646,517✔
2070
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
7,646,282✔
2071
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
2072
      ST_TASK_TLOG("%s not data, skip, ver:%"PRId64, __func__, *ver);
×
2073
      continue;
×
2074
    }
2075
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
7,646,282✔
2076

2077
    SWalCont* wCont = &pWalReader->pHead->head;
7,646,049✔
2078
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
7,646,284✔
2079
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
7,646,284✔
2080

2081
    STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, pBody, bodyLen, ranges, rsp, *ver));
7,646,284✔
2082
  }
2083
  
2084
end:
8,373,651✔
2085
  return code;
8,373,651✔
2086
}
2087

2088
static int32_t filterData(SSTriggerWalNewRsp* resultRsp, SStreamTriggerReaderInfo* sStreamReaderInfo) {
519,382✔
2089
  int32_t      code = 0;
519,382✔
2090
  int32_t       lino = 0;
519,382✔
2091
  SColumnInfoData* pRet = NULL;
519,382✔
2092

2093
  int64_t totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
519,382✔
2094
  STREAM_CHECK_RET_GOTO(qStreamFilter(((SSDataBlock*)resultRsp->dataBlock), sStreamReaderInfo->pFilterInfo, &pRet));
519,382✔
2095

2096
  if (((SSDataBlock*)resultRsp->dataBlock)->info.rows < totalRows) {
519,382✔
2097
    filterIndexHash(resultRsp->indexHash, pRet);
11,589✔
2098
  }
2099

2100
end:
519,382✔
2101
  colDataDestroy(pRet);
519,382✔
2102
  taosMemoryFree(pRet);
519,382✔
2103
  return code;
519,382✔
2104
}
2105

2106
static int32_t processWalVerMetaDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
3,265,978✔
2107
                                    SSTriggerWalNewRsp* resultRsp) {
2108
  int32_t      code = 0;
3,265,978✔
2109
  int32_t      lino = 0;
3,265,978✔
2110
  void* pTask = sStreamReaderInfo->pTask;
3,265,978✔
2111
                                        
2112
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
3,267,240✔
2113
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
3,266,675✔
2114
  blockDataEmpty(resultRsp->dataBlock);
3,266,675✔
2115
  blockDataEmpty(resultRsp->metaBlock);
3,262,216✔
2116
  int64_t lastVer = resultRsp->ver;                                      
3,262,158✔
2117
  STREAM_CHECK_RET_GOTO(prepareIndexMetaData(pWalReader, sStreamReaderInfo, resultRsp));
3,262,158✔
2118
  STREAM_CHECK_CONDITION_GOTO(resultRsp->totalRows == 0, TDB_CODE_SUCCESS);
3,266,572✔
2119

2120
  buildIndexHash(resultRsp->indexHash, pTask);
123,569✔
2121
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(((SSDataBlock*)resultRsp->dataBlock), resultRsp->totalRows));
123,569✔
2122
  while(lastVer < resultRsp->ver) {
2,827,836✔
2123
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, lastVer++));
2,704,267✔
2124
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
2,704,528✔
2125
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
43,460✔
2126
      continue;
43,460✔
2127
    }
2128
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
2,661,270✔
2129
    SWalCont* wCont = &pWalReader->pHead->head;
2,661,242✔
2130
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
2,661,242✔
2131
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
2,660,997✔
2132
    ST_TASK_DLOG("process wal ver:%" PRId64 ", type:%d, bodyLen:%d", wCont->version, wCont->msgType, bodyLen);
2,660,997✔
2133
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, NULL, resultRsp, wCont->version));
2,661,258✔
2134
  }
2135

2136
  int32_t metaRows = resultRsp->totalRows - ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
123,569✔
2137
  STREAM_CHECK_RET_GOTO(filterData(resultRsp, sStreamReaderInfo));
123,569✔
2138
  resultRsp->totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows + metaRows;
123,569✔
2139

2140
end:
3,265,416✔
2141
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
3,265,416✔
2142
          resultRsp->totalRows, resultRsp->ver, walGetAppliedVer(pWalReader->pWal));
2143
  walCloseReader(pWalReader);
3,265,416✔
2144
  return code;
3,266,106✔
2145
}
2146

2147
static int32_t processWalVerDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
8,374,108✔
2148
                                    SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
2149
  int32_t      code = 0;
8,374,108✔
2150
  int32_t      lino = 0;
8,374,108✔
2151

2152
  void* pTask = sStreamReaderInfo->pTask;
8,374,108✔
2153
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
8,374,571✔
2154
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
8,373,449✔
2155
  
2156
  if (taosArrayGetSize(versions) > 0) {
8,373,449✔
2157
    rsp->ver = *(int64_t*)taosArrayGetLast(versions);
395,813✔
2158
  }
2159
  
2160
  STREAM_CHECK_RET_GOTO(prepareIndexData(pWalReader, sStreamReaderInfo, versions, ranges, rsp));
8,373,449✔
2161
  STREAM_CHECK_CONDITION_GOTO(rsp->totalRows == 0, TDB_CODE_SUCCESS);
8,373,651✔
2162

2163
  ST_TASK_TLOG("%s index hash:%p %d", __func__, rsp->indexHash, tSimpleHashGetSize(rsp->indexHash));
395,813✔
2164
  buildIndexHash(rsp->indexHash, pTask);
395,813✔
2165

2166
  blockDataEmpty(rsp->dataBlock);
395,813✔
2167
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dataBlock, rsp->totalRows));
395,813✔
2168

2169
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
8,041,673✔
2170
    int64_t *ver = taosArrayGet(versions, i);
7,645,855✔
2171
    if (ver == NULL) continue;
7,645,643✔
2172
    ST_TASK_TLOG("vgId:%d %s scan wal process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, *ver, walGetAppliedVer(pWalReader->pWal));
7,645,643✔
2173

2174
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
7,645,643✔
2175
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
7,646,517✔
2176
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
2177
      continue;
×
2178
    }
2179
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
7,646,282✔
2180
    SWalCont* wCont = &pWalReader->pHead->head;
7,645,804✔
2181
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
7,645,804✔
2182
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
7,645,804✔
2183

2184
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, ranges, rsp, wCont->version));
7,645,804✔
2185
  }
2186
  // printDataBlock(rsp->dataBlock, __func__, "processWalVerDataNew");
2187
  STREAM_CHECK_RET_GOTO(filterData(rsp, sStreamReaderInfo));
395,580✔
2188
  rsp->totalRows = ((SSDataBlock*)rsp->dataBlock)->info.rows;
395,813✔
2189

2190
end:
8,373,651✔
2191
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
8,373,651✔
2192
            rsp->totalRows, rsp->ver, walGetAppliedVer(pWalReader->pWal));
2193
  walCloseReader(pWalReader);
8,374,313✔
2194
  return code;
8,373,881✔
2195
}
2196

2197
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas, SStorageAPI* api) {
347,460✔
2198
  int32_t code = 0;
347,460✔
2199
  int32_t lino = 0;
347,460✔
2200
  SMetaReader metaReader = {0};
347,460✔
2201
  *schemas = taosArrayInit(8, sizeof(SSchema));
347,460✔
2202
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
347,460✔
2203
  
2204
  api->metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api->metaFn);
347,460✔
2205
  STREAM_CHECK_RET_GOTO(api->metaReaderFn.getTableEntryByUid(&metaReader, uid));
347,460✔
2206

2207
  SSchemaWrapper* sSchemaWrapper = NULL;
345,474✔
2208
  if (metaReader.me.type == TD_CHILD_TABLE) {
345,474✔
2209
    int64_t suid = metaReader.me.ctbEntry.suid;
345,474✔
2210
    tDecoderClear(&metaReader.coder);
345,474✔
2211
    STREAM_CHECK_RET_GOTO(api->metaReaderFn.getTableEntryByUid(&metaReader, suid));
345,474✔
2212
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
345,474✔
2213
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
×
2214
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
×
2215
  } else {
2216
    qError("invalid table type:%d", metaReader.me.type);
×
2217
  }
2218

2219
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
2,028,466✔
2220
    SSchema* s = sSchemaWrapper->pSchema + j;
1,682,992✔
2221
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
3,365,984✔
2222
  }
2223

2224
end:
347,460✔
2225
  api->metaReaderFn.clearReader(&metaReader);
347,460✔
2226
  STREAM_PRINT_LOG_END(code, lino);
347,460✔
2227
  if (code != 0)  {
347,460✔
2228
    taosArrayDestroy(*schemas);
1,986✔
2229
    *schemas = NULL;
1,986✔
2230
  }
2231
  return code;
347,460✔
2232
}
2233

2234
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
345,474✔
2235
  int32_t code = 0;
345,474✔
2236
  int32_t lino = 0;
345,474✔
2237
  size_t  schemaLen = taosArrayGetSize(schemas);
345,474✔
2238
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
345,474✔
2239
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1,341,830✔
2240
    col_id_t* id = taosArrayGet(cols, i);
996,356✔
2241
    STREAM_CHECK_NULL_GOTO(id, terrno);
996,356✔
2242
    for (size_t i = 0; i < schemaLen; i++) {
2,595,684✔
2243
      SSchema* s = taosArrayGet(schemas, i);
2,595,684✔
2244
      STREAM_CHECK_NULL_GOTO(s, terrno);
2,595,684✔
2245
      if (*id == s->colId) {
2,595,684✔
2246
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
996,356✔
2247
        break;
996,356✔
2248
      }
2249
    }
2250
  }
2251
  taosArrayPopFrontBatch(schemas, schemaLen);
345,474✔
2252

2253
end:
345,474✔
2254
  return code;
345,474✔
2255
}
2256

2257
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
×
2258
                                    STargetNode* pTargetNodeTs) {
2259
  int32_t code = 0;
×
2260
  int32_t lino = 0;
×
2261

2262
  SColumnNode*         pCol = NULL;
×
2263
  SColumnNode*         pCol1 = NULL;
×
2264
  SValueNode*          pVal = NULL;
×
2265
  SValueNode*          pVal1 = NULL;
×
2266
  SOperatorNode*       op = NULL;
×
2267
  SOperatorNode*       op1 = NULL;
×
2268
  SLogicConditionNode* cond = NULL;
×
2269

2270
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
×
2271
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
×
2272
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
×
2273
  pCol->node.resType.bytes = LONG_BYTES;
×
2274
  pCol->slotId = pTargetNodeTs->slotId;
×
2275
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
×
2276

2277
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
×
2278

2279
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
×
2280
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
×
2281
  pVal->node.resType.bytes = LONG_BYTES;
×
2282
  pVal->datum.i = start;
×
2283
  pVal->typeData = start;
×
2284

2285
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
×
2286
  pVal1->datum.i = end;
×
2287
  pVal1->typeData = end;
×
2288

2289
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
×
2290
  op->opType = OP_TYPE_GREATER_EQUAL;
×
2291
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
2292
  op->node.resType.bytes = CHAR_BYTES;
×
2293
  op->pLeft = (SNode*)pCol;
×
2294
  op->pRight = (SNode*)pVal;
×
2295
  pCol = NULL;
×
2296
  pVal = NULL;
×
2297

2298
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
×
2299
  op1->opType = OP_TYPE_LOWER_EQUAL;
×
2300
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
2301
  op1->node.resType.bytes = CHAR_BYTES;
×
2302
  op1->pLeft = (SNode*)pCol1;
×
2303
  op1->pRight = (SNode*)pVal1;
×
2304
  pCol1 = NULL;
×
2305
  pVal1 = NULL;
×
2306

2307
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
×
2308
  cond->condType = LOGIC_COND_TYPE_AND;
×
2309
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
2310
  cond->node.resType.bytes = CHAR_BYTES;
×
2311
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
×
2312
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
×
2313
  op = NULL;
×
2314
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
×
2315
  op1 = NULL;
×
2316

2317
  *pCond = cond;
×
2318

2319
end:
×
2320
  if (code != 0) {
×
2321
    nodesDestroyNode((SNode*)pCol);
×
2322
    nodesDestroyNode((SNode*)pCol1);
×
2323
    nodesDestroyNode((SNode*)pVal);
×
2324
    nodesDestroyNode((SNode*)pVal1);
×
2325
    nodesDestroyNode((SNode*)op);
×
2326
    nodesDestroyNode((SNode*)op1);
×
2327
    nodesDestroyNode((SNode*)cond);
×
2328
  }
2329
  STREAM_PRINT_LOG_END(code, lino);
×
2330

2331
  return code;
×
2332
}
2333

2334
/*
2335
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
2336
  int32_t              code = 0;
2337
  int32_t              lino = 0;
2338
  SLogicConditionNode* pAndCondition = NULL;
2339
  SLogicConditionNode* cond = NULL;
2340

2341
  if (pTargetNodeTs == NULL) {
2342
    vError("stream reader %s no ts column", __func__);
2343
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
2344
  }
2345
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
2346
  cond->condType = LOGIC_COND_TYPE_OR;
2347
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
2348
  cond->node.resType.bytes = CHAR_BYTES;
2349
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
2350

2351
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
2352
    data->curIdx = i;
2353

2354
    SReadHandle handle = {0};
2355
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
2356
    if (!handle.winRangeValid) {
2357
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
2358
              handle.winRange.ekey);
2359
      continue;
2360
    }
2361
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
2362
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
2363
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
2364
    pAndCondition = NULL;
2365
  }
2366

2367
  *pCond = cond;
2368

2369
end:
2370
  if (code != 0) {
2371
    nodesDestroyNode((SNode*)pAndCondition);
2372
    nodesDestroyNode((SNode*)cond);
2373
  }
2374
  STREAM_PRINT_LOG_END(code, lino);
2375

2376
  return code;
2377
}
2378
*/
2379

2380
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
407,554✔
2381
                                    STimeRangeNode* node, SReadHandle* handle, bool isExtWin) {
2382
  int32_t code = 0;
407,554✔
2383
  int32_t lino = 0;
407,554✔
2384
  void* pTask = sStreamReaderCalcInfo->pTask;
407,554✔
2385
  STimeWindow* pWin = isExtWin ? &handle->extWinRange : &handle->winRange;
407,554✔
2386
  bool* pValid = isExtWin ? &handle->extWinRangeValid : &handle->winRangeValid;
407,324✔
2387
  
2388
  if (req->pStRtFuncInfo->withExternalWindow) {
407,554✔
2389
    sStreamReaderCalcInfo->tmpRtFuncInfo.curIdx = 0;
306,493✔
2390
    sStreamReaderCalcInfo->tmpRtFuncInfo.triggerType = req->pStRtFuncInfo->triggerType;
306,493✔
2391
    sStreamReaderCalcInfo->tmpRtFuncInfo.isWindowTrigger = req->pStRtFuncInfo->isWindowTrigger;
306,493✔
2392
    sStreamReaderCalcInfo->tmpRtFuncInfo.precision = req->pStRtFuncInfo->precision;
306,493✔
2393

2394
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
306,493✔
2395
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
306,493✔
2396
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
306,493✔
2397
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
306,493✔
2398

2399
    if (!node->needCalc) {
306,493✔
2400
      pWin->skey = pFirst->wstart;
203,486✔
2401
      pWin->ekey = pLast->wend;
203,486✔
2402
      *pValid = true;
203,486✔
2403
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
203,486✔
2404
        pWin->ekey--;
137,933✔
2405
      }
2406
    } else {
2407
      SSTriggerCalcParam* pTmp = taosArrayGet(sStreamReaderCalcInfo->tmpRtFuncInfo.pStreamPesudoFuncVals, 0);
103,007✔
2408
      memcpy(pTmp, pFirst, sizeof(*pTmp));
103,007✔
2409

2410
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 1));
103,007✔
2411
      if (*pValid) {
103,007✔
2412
        int64_t skey = pWin->skey;
103,007✔
2413

2414
        memcpy(pTmp, pLast, sizeof(*pTmp));
103,007✔
2415
        STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 2));
103,007✔
2416

2417
        if (*pValid) {
103,007✔
2418
          pWin->skey = skey;
103,007✔
2419
        }
2420
      }
2421
      pWin->ekey--;
103,007✔
2422
    }
2423
  } else {
2424
    if (!node->needCalc) {
101,061✔
2425
      SSTriggerCalcParam* pCurr = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, req->pStRtFuncInfo->curIdx);
58,588✔
2426
      pWin->skey = pCurr->wstart;
58,588✔
2427
      pWin->ekey = pCurr->wend;
58,588✔
2428
      *pValid = true;
58,588✔
2429
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
58,588✔
2430
        pWin->ekey--;
11,634✔
2431
      }
2432
    } else {
2433
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, req->pStRtFuncInfo, pWin, pValid, 3));
42,473✔
2434
      pWin->ekey--;
42,473✔
2435
    }
2436
  }
2437

2438
  ST_TASK_DLOG("%s type:%s, withExternalWindow:%d, skey:%" PRId64 ", ekey:%" PRId64 ", validRange:%d", 
407,554✔
2439
      __func__, isExtWin ? "interp range" : "scan time range", req->pStRtFuncInfo->withExternalWindow, pWin->skey, pWin->ekey, *pValid);
2440

2441
end:
6,016✔
2442

2443
  if (code) {
407,554✔
2444
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2445
  }
2446
  
2447
  return code;
407,554✔
2448
}
2449

2450
static int32_t createDataBlockTsUid(SSDataBlock** pBlockRet, uint32_t numOfRows) {
419,930✔
2451
  int32_t      code = 0;
419,930✔
2452
  int32_t      lino = 0;
419,930✔
2453
  SSDataBlock* pBlock = NULL;
419,930✔
2454
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
420,132✔
2455
  SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID);
419,911✔
2456
  STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
420,132✔
2457
  idata = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID + 1);
420,132✔
2458
  STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
420,132✔
2459
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
419,911✔
2460

2461
end:
419,899✔
2462
  STREAM_PRINT_LOG_END(code, lino)
419,899✔
2463
  if (code != TSDB_CODE_SUCCESS) {
420,132✔
2464
    blockDataDestroy(pBlock);
×
2465
    pBlock = NULL;
×
2466
  }
2467
  *pBlockRet = pBlock;
420,132✔
2468
  return code;
419,899✔
2469
}
2470

2471
static int32_t processTsOutPutAllTables(SStreamTriggerReaderInfo* sStreamReaderInfo, SStreamTsResponse* tsRsp, SSDataBlock* pResBlock, int32_t order) {
246,605✔
2472
  int32_t code = 0;
246,605✔
2473
  int32_t lino = 0;
246,605✔
2474
  void* pTask = sStreamReaderInfo->pTask;
246,605✔
2475

2476
  tsRsp->tsInfo = taosArrayInit(pResBlock->info.rows, sizeof(STsInfo));
246,605✔
2477
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
246,605✔
2478
  SColumnInfoData* pColInfoDataTs = taosArrayGet(pResBlock->pDataBlock, 0);
246,605✔
2479
  SColumnInfoData* pColInfoDataUid = taosArrayGet(pResBlock->pDataBlock, 1);
246,605✔
2480
  for (int32_t j = 0; j < pResBlock->info.rows; j++) {
656,274✔
2481
    if (colDataIsNull_s(pColInfoDataTs, j) || pColInfoDataTs->pData == NULL) {
819,338✔
2482
      continue;
×
2483
    }
2484
    STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
409,669✔
2485
    STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
409,669✔
2486
    if (order == TSDB_ORDER_ASC) {
409,669✔
2487
      tsInfo->ts = INT64_MAX;
205,439✔
2488
    } else {
2489
      tsInfo->ts = INT64_MIN;
204,230✔
2490
    }
2491
    int64_t ts = *(int64_t*)colDataGetNumData(pColInfoDataTs, j);
409,669✔
2492
    if (order == TSDB_ORDER_ASC && ts < tsInfo->ts) {
409,669✔
2493
      tsInfo->ts = ts;
205,439✔
2494
    } else if (order == TSDB_ORDER_DESC && ts > tsInfo->ts) {
204,230✔
2495
      tsInfo->ts = ts;
204,230✔
2496
    }
2497
    tsInfo->gId = *(int64_t*)colDataGetNumData(pColInfoDataUid, j);
409,669✔
2498
    ST_TASK_DLOG("%s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, __func__, tsInfo->ts, tsInfo->gId, tsRsp->ver);
409,669✔
2499
  }
2500

2501
end:
246,605✔
2502
  return code;
246,605✔
2503
}
2504

2505
static int32_t processTsOutPutOneGroup(SStreamTriggerReaderInfo* sStreamReaderInfo, SStreamTsResponse* tsRsp, SSDataBlock* pResBlock, int32_t order) {
81,246✔
2506
  int32_t code = 0;
81,246✔
2507
  int32_t lino = 0;
81,246✔
2508
  void* pTask = sStreamReaderInfo->pTask;
81,246✔
2509

2510
  tsRsp->tsInfo = taosArrayInit(1, sizeof(STsInfo));
80,535✔
2511
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
80,765✔
2512
  STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
80,302✔
2513
  STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
81,476✔
2514
  if (order == TSDB_ORDER_ASC) {
81,476✔
2515
    tsInfo->ts = INT64_MAX;
65,554✔
2516
  } else {
2517
    tsInfo->ts = INT64_MIN;
15,922✔
2518
  }
2519

2520
  SColumnInfoData* pColInfoDataTs = taosArrayGet(pResBlock->pDataBlock, 0);
81,243✔
2521
  SColumnInfoData* pColInfoDataUid = taosArrayGet(pResBlock->pDataBlock, 1);
80,780✔
2522
  for (int32_t j = 0; j < pResBlock->info.rows; j++) {
176,837✔
2523
    if (colDataIsNull_s(pColInfoDataTs, j) || pColInfoDataTs->pData == NULL) {
192,565✔
2524
      continue;
×
2525
    }
2526
    int64_t ts = *(int64_t*)colDataGetNumData(pColInfoDataTs, j);
96,048✔
2527
    if (order == TSDB_ORDER_ASC && ts < tsInfo->ts) {
95,594✔
2528
      tsInfo->ts = ts;
65,554✔
2529
    } else if (order == TSDB_ORDER_DESC && ts > tsInfo->ts) {
30,736✔
2530
      tsInfo->ts = ts;
15,922✔
2531
    }
2532
  }
2533
  int64_t uid = *(int64_t*)colDataGetNumData(pColInfoDataUid, 0);
81,476✔
2534
  tsInfo->gId = qStreamGetGroupIdFromSet(sStreamReaderInfo, uid);
81,476✔
2535
  ST_TASK_DLOG("%s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, __func__, tsInfo->ts, tsInfo->gId, tsRsp->ver);
81,476✔
2536

2537
end:
30,241✔
2538
  return code;
81,476✔
2539
}
2540

2541
static int32_t processTsOutPutAllGroups(SStreamTriggerReaderInfo* sStreamReaderInfo, SStreamTsResponse* tsRsp, SSDataBlock* pResBlock, int32_t order) {
7,305✔
2542
  int32_t code = 0;
7,305✔
2543
  int32_t lino = 0;
7,305✔
2544
  STableKeyInfo* pList = NULL;
7,305✔
2545
  StreamTableListInfo     tableInfo = {0};
7,305✔
2546

2547
  void* pTask = sStreamReaderInfo->pTask;
7,305✔
2548
  STREAM_CHECK_RET_GOTO(qStreamCopyTableInfo(sStreamReaderInfo, &tableInfo));
7,305✔
2549

2550
  SSHashObj*   uidTsHash = tSimpleHashInit(pResBlock->info.rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
7,305✔
2551
  STREAM_CHECK_NULL_GOTO(uidTsHash, terrno);
7,305✔
2552
  SColumnInfoData* pColInfoDataTs = taosArrayGet(pResBlock->pDataBlock, 0);
7,305✔
2553
  SColumnInfoData* pColInfoDataUid = taosArrayGet(pResBlock->pDataBlock, 1);
7,305✔
2554
  for (int32_t j = 0; j < pResBlock->info.rows; j++) {
22,578✔
2555
    if (colDataIsNull_s(pColInfoDataTs, j) || pColInfoDataTs->pData == NULL) {
30,546✔
2556
      continue;
×
2557
    }
2558
    int64_t ts = *(int64_t*)colDataGetNumData(pColInfoDataTs, j);
15,273✔
2559
    int64_t uid = *(int64_t*)colDataGetNumData(pColInfoDataUid, j);
15,273✔
2560
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(uidTsHash, &uid, LONG_BYTES, &ts, LONG_BYTES));
15,273✔
2561
  }
2562
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(sStreamReaderInfo), sizeof(STsInfo));
7,305✔
2563
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
7,305✔
2564
  while (true) {
13,741✔
2565
    int32_t        pNum = 0;
21,046✔
2566
    int64_t        suid = 0;
21,046✔
2567
    STREAM_CHECK_RET_GOTO(qStreamIterTableList(&tableInfo, &pList, &pNum, &suid));
21,046✔
2568
    if(pNum == 0) break;
21,046✔
2569
    STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
14,314✔
2570
    STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
14,314✔
2571
    if (order == TSDB_ORDER_ASC) {
14,314✔
2572
      tsInfo->ts = INT64_MAX;
6,685✔
2573
    } else {
2574
      tsInfo->ts = INT64_MIN;
7,629✔
2575
    }
2576
    for (int32_t i = 0; i < pNum; i++) {
29,183✔
2577
      int64_t uid = pList[i].uid;
15,442✔
2578
      int64_t *ts = tSimpleHashGet(uidTsHash, &uid, LONG_BYTES);
15,442✔
2579
      STREAM_CHECK_NULL_GOTO(ts, terrno);
15,442✔
2580
      if (order == TSDB_ORDER_ASC && *ts < tsInfo->ts) {
14,869✔
2581
        tsInfo->ts = *ts;
6,685✔
2582
      } else if (order == TSDB_ORDER_DESC && *ts > tsInfo->ts) {
8,184✔
2583
        tsInfo->ts = *ts;
7,056✔
2584
      }
2585
    }
2586
    int64_t uid = pList[0].uid;
13,741✔
2587
    tsInfo->gId = qStreamGetGroupIdFromSet(sStreamReaderInfo, uid);
13,741✔
2588
    ST_TASK_DLOG("%s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, __func__, tsInfo->ts, tsInfo->gId, tsRsp->ver);
13,741✔
2589
    taosMemoryFreeClear(pList);
13,741✔
2590
  }
2591

2592
end:
7,305✔
2593
  qStreamDestroyTableInfo(&tableInfo);
7,305✔
2594
  taosMemoryFreeClear(pList);
7,305✔
2595
  tSimpleHashCleanup(uidTsHash);
7,305✔
2596
  return code;
7,305✔
2597
}
2598

2599
// static bool stReaderTaskWaitQuit(SStreamTask* pTask) { return taosHasRWWFlag(&pTask->entryLock); }
2600

2601
static int32_t getAllTs(SVnode* pVnode, SSDataBlock*  pResBlock, SStreamReaderTaskInner* pTaskInner, STableKeyInfo* pList, int32_t pNum) {
327,943✔
2602
  int32_t code = 0;
327,943✔
2603
  int32_t lino = 0;
327,943✔
2604

2605
  STREAM_CHECK_RET_GOTO(pTaskInner->storageApi->tsdReader.tsdCreateFirstLastTsIter(pVnode, &pTaskInner->options->twindows, &(SVersionRange){.minVer = -1, .maxVer = pTaskInner->options->ver},
327,943✔
2606
                                                pTaskInner->options->suid, pList, pNum, pTaskInner->options->order, &pTaskInner->pReader, pTaskInner->idStr));
2607
  bool hasNext = true;
327,081✔
2608
  while(1){
2609
    STREAM_CHECK_RET_GOTO(pTaskInner->storageApi->tsdReader.tsdNextFirstLastTsBlock(pTaskInner->pReader, pResBlock, &hasNext));
327,081✔
2610
    STREAM_CHECK_CONDITION_GOTO(!hasNext, TDB_CODE_SUCCESS);
327,943✔
2611
  }
2612

2613
end:
327,943✔
2614
  pTaskInner->storageApi->tsdReader.tsdDestroyFirstLastTsIter(pTaskInner->pReader);
326,615✔
2615
  pTaskInner->pReader = NULL;
327,078✔
2616
  return code;
327,247✔
2617
}
2618

2619
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
201,283✔
2620
                                  SStreamReaderTaskInner* pTaskInner) {
2621
  int32_t code = 0;
201,283✔
2622
  int32_t lino = 0;
201,283✔
2623
  STableKeyInfo* pList = NULL;
201,283✔
2624
  StreamTableListInfo     tableInfo = {0};
201,283✔
2625

2626
  void* pTask = sStreamReaderInfo->pTask;
201,283✔
2627
  STREAM_CHECK_RET_GOTO(qStreamCopyTableInfo(sStreamReaderInfo, &tableInfo));
201,283✔
2628

2629
  SSDataBlock*  pResBlock = NULL;
201,283✔
2630
  STREAM_CHECK_RET_GOTO(createDataBlockTsUid(&pResBlock, qStreamGetTableListNum(sStreamReaderInfo)));
201,283✔
2631

2632
  while (true) {
109,094✔
2633
    int32_t        pNum = 0;
310,377✔
2634
    int64_t        suid = 0;
310,377✔
2635
    STREAM_CHECK_RET_GOTO(qStreamIterTableList(&tableInfo, &pList, &pNum, &suid));
310,377✔
2636
    if(pNum == 0) break;
310,377✔
2637
    pTaskInner->options->suid = suid;
109,094✔
2638
    STREAM_CHECK_RET_GOTO(getAllTs(pVnode, pResBlock, pTaskInner, pList, pNum));
109,094✔
2639
    taosMemoryFreeClear(pList);
108,882✔
2640
  }
2641

2642
  STREAM_CHECK_RET_GOTO(processTsOutPutAllTables(sStreamReaderInfo, tsRsp, pResBlock, pTaskInner->options->order));
201,283✔
2643

2644
end:
201,283✔
2645
  qStreamDestroyTableInfo(&tableInfo);
201,283✔
2646
  taosMemoryFreeClear(pList);
201,283✔
2647
  blockDataDestroy(pResBlock);
201,283✔
2648
  STREAM_PRINT_LOG_END_WITHID(code, lino);
201,283✔
2649
  return code;
201,283✔
2650
}
2651

2652
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
209,399✔
2653
                                  SStreamReaderTaskInner* pTaskInner) {
2654
  int32_t code = 0;
209,399✔
2655
  int32_t lino = 0;
209,399✔
2656
  STableKeyInfo* pList = NULL;
209,399✔
2657
  void* pTask = sStreamReaderInfo->pTask;
209,399✔
2658
  
2659
  SSDataBlock*  pResBlock = NULL;
209,399✔
2660

2661
  int32_t        pNum = 0;
209,399✔
2662
  int64_t        suid = 0;
209,399✔
2663
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, 0, &pList, &pNum));
209,399✔
2664
  STREAM_CHECK_CONDITION_GOTO(pNum == 0, TSDB_CODE_SUCCESS);
209,399✔
2665
  STREAM_CHECK_RET_GOTO(createDataBlockTsUid(&pResBlock, pNum));
177,518✔
2666

2667
  pTaskInner->options->suid = sStreamReaderInfo->suid;
177,518✔
2668
  STREAM_CHECK_RET_GOTO(getAllTs(pVnode, pResBlock, pTaskInner, pList, pNum));
177,518✔
2669
  STREAM_CHECK_CONDITION_GOTO(pResBlock->info.rows == 0, TDB_CODE_SUCCESS);
177,349✔
2670
  int32_t order = pTaskInner->options->order;
92,772✔
2671

2672
  if (sStreamReaderInfo->groupByTbname) {
92,772✔
2673
    STREAM_CHECK_RET_GOTO(processTsOutPutAllTables(sStreamReaderInfo, tsRsp, pResBlock, order));
45,322✔
2674
  } else if (sStreamReaderInfo->partitionCols == NULL) {
47,450✔
2675
    STREAM_CHECK_RET_GOTO(processTsOutPutOneGroup(sStreamReaderInfo, tsRsp, pResBlock, order));
40,145✔
2676
  } else {
2677
    STREAM_CHECK_RET_GOTO(processTsOutPutAllGroups(sStreamReaderInfo, tsRsp, pResBlock, order));
7,305✔
2678
  }                             
2679
end:
208,759✔
2680
  blockDataDestroy(pResBlock);
209,183✔
2681
  taosMemoryFreeClear(pList);
209,277✔
2682
  STREAM_PRINT_LOG_END_WITHID(code, lino);
209,230✔
2683
  return code;
209,230✔
2684
}
2685

2686
static int32_t processTsOnce(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
55,031✔
2687
                                  SStreamReaderTaskInner* pTaskInner, uint64_t gid) {
2688
  int32_t code = 0;
55,031✔
2689
  int32_t lino = 0;
55,031✔
2690
  STableKeyInfo* pList = NULL;
55,031✔
2691
  void* pTask = sStreamReaderInfo->pTask;
55,031✔
2692
  
2693
  SSDataBlock*  pResBlock = NULL;
55,031✔
2694

2695
  int32_t        pNum = 0;
55,031✔
2696
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, gid, &pList, &pNum));
55,031✔
2697
  STREAM_CHECK_CONDITION_GOTO(pNum == 0, TSDB_CODE_SUCCESS);
55,031✔
2698
  STREAM_CHECK_RET_GOTO(createDataBlockTsUid(&pResBlock, pNum));
41,331✔
2699

2700
  pTaskInner->options->suid = sStreamReaderInfo->suid;
41,331✔
2701
  STREAM_CHECK_RET_GOTO(getAllTs(pVnode, pResBlock, pTaskInner, pList, pNum));
41,331✔
2702
  STREAM_CHECK_CONDITION_GOTO(pResBlock->info.rows == 0, TDB_CODE_SUCCESS);
41,331✔
2703
  int32_t order = pTaskInner->options->order;
40,408✔
2704

2705
  STREAM_CHECK_RET_GOTO(processTsOutPutOneGroup(sStreamReaderInfo, tsRsp, pResBlock, order));
40,635✔
2706
end:
55,031✔
2707
  blockDataDestroy(pResBlock);
55,031✔
2708
  taosMemoryFreeClear(pList);
54,341✔
2709
  STREAM_PRINT_LOG_END_WITHID(code, lino);
54,341✔
2710
  return code;
54,341✔
2711
}
2712

2713
static int32_t processTs(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
410,451✔
2714
                                  SStreamReaderTaskInner* pTaskInner) {
2715
  if (sStreamReaderInfo->isVtableStream) {
410,451✔
2716
    return processTsVTable(pVnode, tsRsp, sStreamReaderInfo, pTaskInner);
201,283✔
2717
  }
2718

2719
  return processTsNonVTable(pVnode, tsRsp, sStreamReaderInfo, pTaskInner);
209,168✔
2720
}
2721

2722
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
116,568✔
2723
  int32_t code = 0;
116,568✔
2724
  int32_t lino = 0;
116,568✔
2725
  void*   buf = NULL;
116,568✔
2726
  size_t  size = 0;
116,568✔
2727
  void* pTask = sStreamReaderInfo->pTask;
116,568✔
2728

2729
  ST_TASK_DLOG("vgId:%d %s start, trigger hash size:%d, calc hash size:%d", TD_VID(pVnode), __func__,
116,568✔
2730
                tSimpleHashGetSize(req->setTableReq.uidInfoTrigger), tSimpleHashGetSize(req->setTableReq.uidInfoCalc));
2731

2732
  taosWLockLatch(&sStreamReaderInfo->lock);
116,568✔
2733
  TSWAP(sStreamReaderInfo->uidHashTrigger, req->setTableReq.uidInfoTrigger);
116,568✔
2734
  TSWAP(sStreamReaderInfo->uidHashCalc, req->setTableReq.uidInfoCalc);
116,568✔
2735
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashTrigger, TSDB_CODE_INVALID_PARA);
116,568✔
2736
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashCalc, TSDB_CODE_INVALID_PARA);
116,568✔
2737

2738
  STREAM_CHECK_RET_GOTO(initStreamTableListInfo(&sStreamReaderInfo->vSetTableList));
116,568✔
2739
  STREAM_CHECK_RET_GOTO(qBuildVTableList(sStreamReaderInfo));
116,568✔
2740
end:
116,568✔
2741
  taosWUnLockLatch(&sStreamReaderInfo->lock);
116,568✔
2742
  STREAM_PRINT_LOG_END_WITHID(code, lino);
116,568✔
2743
  SRpcMsg rsp = {
116,568✔
2744
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2745
  tmsgSendRsp(&rsp);
116,568✔
2746
  return code;
116,568✔
2747
}
2748

2749
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
258,626✔
2750
  int32_t                 code = 0;
258,626✔
2751
  int32_t                 lino = 0;
258,626✔
2752
  SStreamReaderTaskInner* pTaskInner = NULL;
258,626✔
2753
  SStreamTsResponse       tsRsp = {0};
258,836✔
2754
  void*                   buf = NULL;
258,836✔
2755
  size_t                  size = 0;
258,836✔
2756

2757
  void* pTask = sStreamReaderInfo->pTask;
258,836✔
2758

2759
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
258,836✔
2760

2761
  BUILD_OPTION(options, 0, sStreamReaderInfo->tableList.version, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, NULL, false, NULL);
258,836✔
2762
  STREAM_CHECK_RET_GOTO(createStreamTaskForTs(&options, &pTaskInner, &sStreamReaderInfo->storageApi));
258,836✔
2763

2764
  tsRsp.ver = sStreamReaderInfo->tableList.version + 1;
258,605✔
2765

2766
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &tsRsp, sStreamReaderInfo, pTaskInner));
258,605✔
2767
  
2768
end:
258,836✔
2769
  ST_TASK_DLOG("vgId:%d %s get result size:%"PRIzu", ver:%"PRId64, TD_VID(pVnode), __func__, taosArrayGetSize(tsRsp.tsInfo), tsRsp.ver);
258,836✔
2770
  code = buildTsRsp(&tsRsp, &buf, &size);
258,836✔
2771
  STREAM_PRINT_LOG_END_WITHID(code, lino);
258,836✔
2772
  SRpcMsg rsp = {
258,836✔
2773
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2774
  tmsgSendRsp(&rsp);
258,836✔
2775
  taosArrayDestroy(tsRsp.tsInfo);
258,836✔
2776
  taosMemoryFree(pTaskInner);
258,836✔
2777
  return code;
258,836✔
2778
}
2779

2780
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
206,877✔
2781
  int32_t                 code = 0;
206,877✔
2782
  int32_t                 lino = 0;
206,877✔
2783
  SStreamReaderTaskInner* pTaskInner = NULL;
206,877✔
2784
  SStreamTsResponse       tsRsp = {0};
206,877✔
2785
  void*                   buf = NULL;
206,877✔
2786
  size_t                  size = 0;
206,877✔
2787

2788
  void* pTask = sStreamReaderInfo->pTask;
206,877✔
2789
  ST_TASK_DLOG("vgId:%d %s start, startTime:%"PRId64" ver:%"PRId64" gid:%"PRId64, TD_VID(pVnode), __func__, req->firstTsReq.startTime, req->firstTsReq.ver, req->firstTsReq.gid);
206,877✔
2790
  int32_t        pNum = 0;
206,877✔
2791

2792
  tsRsp.ver = pVnode->state.applied;
206,877✔
2793

2794
  BUILD_OPTION(options, 0, req->firstTsReq.ver, TSDB_ORDER_ASC, req->firstTsReq.startTime, INT64_MAX, NULL, false, NULL);
206,877✔
2795
  STREAM_CHECK_RET_GOTO(createStreamTaskForTs(&options, &pTaskInner, &sStreamReaderInfo->storageApi));
206,877✔
2796

2797
  if (req->firstTsReq.gid != 0) {
206,877✔
2798
    STREAM_CHECK_RET_GOTO(processTsOnce(pVnode, &tsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.gid));
55,031✔
2799
  } else {
2800
    STREAM_CHECK_RET_GOTO(processTs(pVnode, &tsRsp, sStreamReaderInfo, pTaskInner));
151,846✔
2801
  }
2802

2803
end:
206,877✔
2804
  ST_TASK_DLOG("vgId:%d %s get result size:%"PRIzu", ver:%"PRId64, TD_VID(pVnode), __func__, taosArrayGetSize(tsRsp.tsInfo), tsRsp.ver);
206,877✔
2805
  code = buildTsRsp(&tsRsp, &buf, &size);
206,877✔
2806
  STREAM_PRINT_LOG_END_WITHID(code, lino);
206,647✔
2807
  SRpcMsg rsp = {
206,647✔
2808
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2809
  tmsgSendRsp(&rsp);
206,414✔
2810
  taosArrayDestroy(tsRsp.tsInfo);
206,877✔
2811
  taosMemoryFree(pTaskInner);
206,411✔
2812
  return code;
206,644✔
2813
}
2814

2815
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
509,868✔
2816
  int32_t code = 0;
509,868✔
2817
  int32_t lino = 0;
509,868✔
2818
  void*   buf = NULL;
509,868✔
2819
  size_t  size = 0;
510,530✔
2820
  STableKeyInfo* pList = NULL;
510,530✔
2821

2822
  void* pTask = sStreamReaderInfo->pTask;
510,530✔
2823
  ST_TASK_DLOG("vgId:%d %s start, ver:%" PRId64 ",skey:%" PRId64 ",ekey:%" PRId64 ",gid:%" PRId64, TD_VID(pVnode),
510,530✔
2824
               __func__, req->tsdbMetaReq.ver, req->tsdbMetaReq.startTime, req->tsdbMetaReq.endTime,
2825
               req->tsdbMetaReq.gid);
2826

2827
  SStreamReaderTaskInner* pTaskInner = NULL;
510,530✔
2828
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
510,530✔
2829

2830
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
510,530✔
2831
    int32_t        pNum = 0;
510,530✔
2832
    STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbMetaReq.gid, &pList, &pNum));
510,530✔
2833
    BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbMetaReq.ver, req->tsdbMetaReq.order, req->tsdbMetaReq.startTime, req->tsdbMetaReq.endTime, 
510,530✔
2834
                          sStreamReaderInfo->tsSchemas, true, NULL);
2835
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, pList, pNum, &sStreamReaderInfo->storageApi));
510,530✔
2836
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
510,530✔
2837
    
2838
    STREAM_CHECK_RET_GOTO(createBlockForTsdbMeta(&pTaskInner->pResBlockDst, sStreamReaderInfo->isVtableStream));
510,530✔
2839
  } else {
2840
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2841
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2842
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2843
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2844
  }
2845

2846
  blockDataCleanup(pTaskInner->pResBlockDst);
510,530✔
2847
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
510,530✔
2848
  bool hasNext = true;
510,530✔
2849
  while (true) {
232,855✔
2850
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
743,385✔
2851
    if (!hasNext) {
743,385✔
2852
      break;
510,530✔
2853
    }
2854
    pTaskInner->storageApi->tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
232,855✔
2855
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
232,855✔
2856

2857
    int32_t index = 0;
232,855✔
2858
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
232,855✔
2859
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
232,617✔
2860
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
232,736✔
2861
    if (!sStreamReaderInfo->isVtableStream) {
232,736✔
2862
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
70,007✔
2863
    }
2864
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
232,736✔
2865

2866
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
232,617✔
2867
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2868
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2869
            pTaskInner->pResBlockDst->info.rows++;
232,855✔
2870
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
232,855✔
2871
      break;
×
2872
    }
2873
  }
2874

2875
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
510,530✔
2876
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
510,530✔
2877
  printDataBlock(pTaskInner->pResBlockDst, __func__, "meta", ((SStreamTask *)sStreamReaderInfo->pTask)->streamId);
510,530✔
2878
  if (!hasNext) {
510,530✔
2879
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
510,530✔
2880
  }
2881

2882
end:
510,530✔
2883
  STREAM_PRINT_LOG_END_WITHID(code, lino);
510,328✔
2884
  SRpcMsg rsp = {
510,530✔
2885
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2886
  tmsgSendRsp(&rsp);
510,530✔
2887
  taosMemoryFree(pList);
510,530✔
2888
  return code;
510,530✔
2889
}
2890

2891
static int32_t vnodeProcessStreamTsdbTsDataReqNonVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
63,884✔
2892
  int32_t                 code = 0;
63,884✔
2893
  int32_t                 lino = 0;
63,884✔
2894
  SStreamReaderTaskInner* pTaskInner = NULL;
63,884✔
2895
  void*                   buf = NULL;
63,884✔
2896
  size_t                  size = 0;
63,884✔
2897
  SSDataBlock*            pBlockRes = NULL;
63,884✔
2898

2899
  void* pTask = sStreamReaderInfo->pTask;
63,884✔
2900
  ST_TASK_DLOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
63,884✔
2901
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2902
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2903

2904
  int32_t        pNum = 1;
63,884✔
2905
  STableKeyInfo  pList = {.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, req->tsdbTsDataReq.uid), .uid = req->tsdbTsDataReq.uid};
63,884✔
2906
  STREAM_CHECK_CONDITION_GOTO(pList.groupId == -1, TSDB_CODE_INVALID_PARA);
63,884✔
2907
  BUILD_OPTION(options, getSuid(sStreamReaderInfo, &pList), req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
63,884✔
2908
               sStreamReaderInfo->triggerCols, false, NULL);
2909
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &pList, pNum, &sStreamReaderInfo->storageApi));
63,884✔
2910
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
63,884✔
2911
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
63,884✔
2912

2913
  while (1) {
63,884✔
2914
    bool hasNext = false;
127,768✔
2915
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
127,768✔
2916
    if (!hasNext) {
127,768✔
2917
      break;
63,884✔
2918
    }
2919
    // if (!sStreamReaderInfo->isVtableStream){
2920
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
63,884✔
2921
    // }
2922

2923
    SSDataBlock* pBlock = NULL;
63,884✔
2924
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
63,884✔
2925
    if (pBlock != NULL && pBlock->info.rows > 0) {
63,884✔
2926
      STREAM_CHECK_RET_GOTO(processTag(sStreamReaderInfo, false, pBlock->info.id.uid, pBlock,
63,884✔
2927
          0, pBlock->info.rows, 1));
2928
    }
2929
    
2930
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderInfo->pFilterInfo, NULL));
63,884✔
2931
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
63,884✔
2932
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
63,884✔
2933
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2934
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2935
  }
2936

2937
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
63,884✔
2938

2939
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
63,884✔
2940
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
63,884✔
2941

2942
end:
63,884✔
2943
  STREAM_PRINT_LOG_END_WITHID(code, lino);
63,884✔
2944
  SRpcMsg rsp = {
63,884✔
2945
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2946
  tmsgSendRsp(&rsp);
63,884✔
2947
  blockDataDestroy(pBlockRes);
63,884✔
2948

2949
  releaseStreamTask(&pTaskInner);
63,884✔
2950
  return code;
63,884✔
2951
}
2952

2953
static int32_t vnodeProcessStreamTsdbTsDataReqVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
×
2954
  int32_t                 code = 0;
×
2955
  int32_t                 lino = 0;
×
2956
  SStreamReaderTaskInner* pTaskInner = NULL;
×
2957
  void*                   buf = NULL;
×
2958
  size_t                  size = 0;
×
2959
  SSDataBlock*            pBlockRes = NULL;
×
2960

2961
  void* pTask = sStreamReaderInfo->pTask;
×
2962
  ST_TASK_DLOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
×
2963
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2964
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2965

2966
  int32_t        pNum = 1;
×
2967
  STableKeyInfo  pList = {.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, req->tsdbTsDataReq.uid), .uid = req->tsdbTsDataReq.uid};
×
2968
  STREAM_CHECK_CONDITION_GOTO(pList.groupId == -1, TSDB_CODE_INVALID_PARA);
×
2969
  BUILD_OPTION(options, getSuid(sStreamReaderInfo, &pList), req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
×
2970
               sStreamReaderInfo->tsSchemas, true, NULL);
2971
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->tsBlock, &pList, pNum, &sStreamReaderInfo->storageApi));
×
2972
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
×
2973

2974
  while (1) {
×
2975
    bool hasNext = false;
×
2976
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
×
2977
    if (!hasNext) {
×
2978
      break;
×
2979
    }
2980

2981
    SSDataBlock* pBlock = NULL;
×
2982
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
×
2983
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRes, pBlock));
×
2984
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
×
2985
            TD_VID(pVnode), __func__, pBlockRes->info.window.skey, pBlockRes->info.window.ekey,
2986
            pBlockRes->info.id.uid, pBlockRes->info.id.groupId, pBlockRes->info.rows);
2987
  }
2988

2989
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
×
2990
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
×
2991

2992
end:
×
2993
  STREAM_PRINT_LOG_END_WITHID(code, lino);
×
2994
  SRpcMsg rsp = {
×
2995
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2996
  tmsgSendRsp(&rsp);
×
2997
  blockDataDestroy(pBlockRes);
×
2998

2999
  releaseStreamTask(&pTaskInner);
×
3000
  return code;
×
3001
}
3002

3003
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
111,140✔
3004
  int32_t code = 0;
111,140✔
3005
  int32_t lino = 0;
111,140✔
3006
  void*   buf = NULL;
111,140✔
3007
  size_t  size = 0;
111,140✔
3008
  STableKeyInfo* pList = NULL;
111,140✔
3009
  SArray*        pResList = NULL;
111,140✔
3010
  SSDataBlock*   pBlockTmp = NULL;
111,140✔
3011

3012
  SStreamReaderTaskInner* pTaskInner = NULL;
111,140✔
3013
  void* pTask = sStreamReaderInfo->pTask;
111,140✔
3014
  ST_TASK_DLOG("vgId:%d %s start. ver:%"PRId64",order:%d,startTs:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, req->tsdbTriggerDataReq.gid);
111,140✔
3015
  
3016
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
111,140✔
3017

3018
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
111,140✔
3019
    int32_t        pNum = 0;
55,570✔
3020
    STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbTriggerDataReq.gid, &pList, &pNum));
55,570✔
3021
    BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
55,570✔
3022
                 sStreamReaderInfo->triggerCols, false, NULL);
3023
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, pList, pNum, &sStreamReaderInfo->storageApi));
55,570✔
3024
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
55,570✔
3025
  } else {
3026
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
55,570✔
3027
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
55,570✔
3028
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
3029
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
3030
  }
3031

3032
  blockDataCleanup(pTaskInner->pResBlockDst);
55,570✔
3033
  bool hasNext = true;
55,570✔
3034
  int32_t totalRows = 0;
55,570✔
3035
    
3036
  pResList = taosArrayInit(4, POINTER_BYTES);
55,570✔
3037
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
55,570✔
3038
  while (1) {
60,742✔
3039
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
116,312✔
3040
    if (!hasNext) {
116,312✔
3041
      break;
55,570✔
3042
    }
3043
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
60,742✔
3044
    // pTaskInner->pResBlockDst->info.id.groupId = pTaskInner->pResBlock->info.id.groupId;
3045

3046
    SSDataBlock* pBlock = NULL;
60,742✔
3047
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
60,742✔
3048
    if (pBlock != NULL && pBlock->info.rows > 0) {
60,742✔
3049
      STREAM_CHECK_RET_GOTO(
60,742✔
3050
        processTag(sStreamReaderInfo, false, pBlock->info.id.uid, pBlock, 0, pBlock->info.rows, 1));
3051
    }
3052
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderInfo->pFilterInfo, NULL));
60,742✔
3053
    // STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
3054
    ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
60,742✔
3055
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pBlock, true, &pBlockTmp));
60,742✔
3056
    STREAM_CHECK_NULL_GOTO(taosArrayPush(pResList, &pBlockTmp), terrno);
60,742✔
3057
    totalRows += blockDataGetNumOfRows(pBlockTmp);
60,742✔
3058
    pBlockTmp = NULL;
60,742✔
3059

3060
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
60,742✔
3061
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
3062
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
3063
    if (totalRows >= STREAM_RETURN_ROWS_NUM) {  //todo optimize send multi blocks in one group
60,742✔
3064
      break;
×
3065
    }
3066
  }
3067

3068
  STREAM_CHECK_RET_GOTO(buildArrayRsp(pResList, &buf, &size));
55,570✔
3069
  if (!hasNext) {
55,337✔
3070
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
55,570✔
3071
  }
3072

3073
end:
110,674✔
3074
  STREAM_PRINT_LOG_END_WITHID(code, lino);
110,677✔
3075
  SRpcMsg rsp = {
110,677✔
3076
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3077
  tmsgSendRsp(&rsp);
110,677✔
3078
  taosMemoryFree(pList);
111,140✔
3079
  blockDataDestroy(pBlockTmp);
111,140✔
3080
  taosArrayDestroyP(pResList, (FDelete)blockDataDestroy);
111,140✔
3081
  return code;
111,140✔
3082
}
3083

3084
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
6,220,618✔
3085
  int32_t code = 0;
6,220,618✔
3086
  int32_t lino = 0;
6,220,618✔
3087
  void*   buf = NULL;
6,220,618✔
3088
  size_t  size = 0;
6,220,618✔
3089
  SSDataBlock*   pBlockRes = NULL;
6,220,618✔
3090
  STableKeyInfo* pList = NULL;
6,220,618✔
3091

3092

3093
  void* pTask = sStreamReaderInfo->pTask;
6,220,618✔
3094
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64",ver:%"PRId64, TD_VID(pVnode), __func__, 
6,220,618✔
3095
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid, req->tsdbCalcDataReq.ver);
3096

3097
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
6,220,618✔
3098

3099
  SStreamReaderTaskInner* pTaskInner = NULL;
6,220,618✔
3100
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
6,220,618✔
3101

3102
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
6,220,618✔
3103
    int32_t        pNum = 0;
6,220,618✔
3104
    STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbCalcDataReq.gid, &pList, &pNum));
6,220,618✔
3105
    BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbCalcDataReq.ver, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
6,220,618✔
3106
                 sStreamReaderInfo->triggerCols, false, NULL);
3107
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, pList, pNum, &sStreamReaderInfo->storageApi));
6,220,618✔
3108

3109
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
6,216,928✔
3110
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
6,219,183✔
3111
  } else {
3112
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
3113
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
3114
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
3115
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
3116
  }
3117

3118
  blockDataCleanup(pTaskInner->pResBlockDst);
6,219,183✔
3119
  bool hasNext = true;
6,220,003✔
3120
  while (1) {
759,779✔
3121
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
6,978,962✔
3122
    if (!hasNext) {
6,975,514✔
3123
      break;
6,216,108✔
3124
    }
3125
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
759,406✔
3126

3127
    SSDataBlock* pBlock = NULL;
759,779✔
3128
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
759,779✔
3129
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderInfo->pFilterInfo, NULL));
759,779✔
3130
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
759,591✔
3131
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
759,591✔
3132
      break;
×
3133
    }
3134
  }
3135

3136
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
6,216,723✔
3137
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
6,220,003✔
3138
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
6,219,388✔
3139
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
6,220,003✔
3140
  printDataBlock(pBlockRes, __func__, "tsdb_calc_data", ((SStreamTask*)pTask)->streamId);
6,217,560✔
3141
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
6,215,920✔
3142
  printDataBlock(pBlockRes, __func__, "tsdb_data", ((SStreamTask*)pTask)->streamId);
6,220,618✔
3143

3144
  if (!hasNext) {
6,220,413✔
3145
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
6,220,413✔
3146
  }
3147

3148
end:
6,219,798✔
3149
  STREAM_PRINT_LOG_END_WITHID(code, lino);
6,220,208✔
3150
  SRpcMsg rsp = {
6,221,233✔
3151
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3152
  tmsgSendRsp(&rsp);
6,220,618✔
3153
  blockDataDestroy(pBlockRes);
6,220,208✔
3154
  taosMemoryFree(pList);
6,220,413✔
3155
  return code;
6,220,413✔
3156
}
3157

3158
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
347,460✔
3159
  int32_t code = 0;
347,460✔
3160
  int32_t lino = 0;
347,460✔
3161
  void*   buf = NULL;
347,460✔
3162
  size_t  size = 0;
347,460✔
3163
  int32_t* slotIdList = NULL;
347,460✔
3164
  SArray* sortedCid = NULL;
347,460✔
3165
  SArray* schemas = NULL;
347,460✔
3166
  SSDataBlock*   pBlockRes = NULL;
347,460✔
3167
  
3168
  void* pTask = sStreamReaderInfo->pTask;
347,460✔
3169
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",ver:%"PRId64, TD_VID(pVnode), __func__, 
347,460✔
3170
    req->tsdbDataReq.skey, req->tsdbDataReq.ekey, req->tsdbDataReq.uid, req->tsdbDataReq.ver);
3171
    
3172
  SStreamReaderTaskInner* pTaskInner = NULL;
347,460✔
3173
  int64_t key = req->tsdbDataReq.uid;
347,460✔
3174

3175
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
347,460✔
3176
    // sort cid and build slotIdList
3177
    slotIdList = taosMemoryMalloc(taosArrayGetSize(req->tsdbDataReq.cids) * sizeof(int32_t));
347,460✔
3178
    STREAM_CHECK_NULL_GOTO(slotIdList, terrno);
347,460✔
3179
    sortedCid = taosArrayDup(req->tsdbDataReq.cids, NULL);
347,460✔
3180
    STREAM_CHECK_NULL_GOTO(sortedCid, terrno);
347,460✔
3181
    taosArraySort(sortedCid, sortCid);
347,460✔
3182
    for (int32_t i = 0; i < taosArrayGetSize(req->tsdbDataReq.cids); i++) {
1,349,774✔
3183
      int16_t* cid = taosArrayGet(req->tsdbDataReq.cids, i);
1,002,314✔
3184
      STREAM_CHECK_NULL_GOTO(cid, terrno);
1,002,314✔
3185
      for (int32_t j = 0; j < taosArrayGetSize(sortedCid); j++) {
1,968,826✔
3186
        int16_t* cidSorted = taosArrayGet(sortedCid, j);
1,968,826✔
3187
        STREAM_CHECK_NULL_GOTO(cidSorted, terrno);
1,968,826✔
3188
        if (*cid == *cidSorted) {
1,968,826✔
3189
          slotIdList[j] = i;
1,002,314✔
3190
          break;
1,002,314✔
3191
        }
3192
      }
3193
    }
3194

3195
    STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, req->tsdbDataReq.uid, &schemas, &sStreamReaderInfo->storageApi));
347,460✔
3196
    STREAM_CHECK_RET_GOTO(shrinkScheams(req->tsdbDataReq.cids, schemas));
345,474✔
3197
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlockRes));
345,474✔
3198

3199
    taosArraySort(schemas, sortSSchema);
345,474✔
3200
    BUILD_OPTION(options, req->tsdbDataReq.suid, req->tsdbDataReq.ver, req->tsdbDataReq.order, req->tsdbDataReq.skey,
345,474✔
3201
                    req->tsdbDataReq.ekey, schemas, true, &slotIdList);
3202
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid, .groupId = 0};
345,474✔
3203
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, pBlockRes, &keyInfo, 1, &sStreamReaderInfo->storageApi));
345,474✔
3204
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
345,474✔
3205
    pTaskInner->pResBlockDst = pBlockRes;
345,474✔
3206
    pBlockRes = NULL;
345,474✔
3207
  } else {
3208
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
3209
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
3210
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
3211
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
3212
  }
3213

3214
  blockDataCleanup(pTaskInner->pResBlockDst);
345,474✔
3215
  bool hasNext = true;
345,474✔
3216
  while (1) {
345,474✔
3217
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
690,948✔
3218
    if (!hasNext) {
690,948✔
3219
      break;
345,474✔
3220
    }
3221

3222
    SSDataBlock* pBlock = NULL;
345,474✔
3223
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
345,474✔
3224
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
345,474✔
3225
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
345,474✔
3226
      break;
×
3227
    }
3228
  }
3229
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
345,474✔
3230
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
345,474✔
3231
  printDataBlock(pTaskInner->pResBlockDst, __func__, "tsdb_data", ((SStreamTask*)pTask)->streamId);
345,474✔
3232
  if (!hasNext) {
345,474✔
3233
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
345,474✔
3234
  }
3235

3236
end:
347,460✔
3237
  STREAM_PRINT_LOG_END_WITHID(code, lino);
347,460✔
3238
  SRpcMsg rsp = {
347,460✔
3239
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3240
  tmsgSendRsp(&rsp);
347,460✔
3241
  taosMemFree(slotIdList);
347,460✔
3242
  taosArrayDestroy(sortedCid);
347,460✔
3243
  taosArrayDestroy(schemas);
347,460✔
3244
  blockDataDestroy(pBlockRes);
347,460✔
3245
  return code;
347,460✔
3246
}
3247

3248
static int32_t vnodeProcessStreamWalMetaNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
11,374,761✔
3249
  int32_t      code = 0;
11,374,761✔
3250
  int32_t      lino = 0;
11,374,761✔
3251
  void*        buf = NULL;
11,374,761✔
3252
  size_t       size = 0;
11,374,761✔
3253
  int64_t      lastVer = 0;
11,374,761✔
3254
  SSTriggerWalNewRsp resultRsp = {0};
11,374,761✔
3255

3256
  void* pTask = sStreamReaderInfo->pTask;
11,376,056✔
3257
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaNewReq.lastVer);
11,375,388✔
3258

3259
  if (sStreamReaderInfo->metaBlock == NULL) {
11,375,590✔
3260
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
174,375✔
3261
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
174,375✔
3262
  }
3263
  blockDataEmpty(sStreamReaderInfo->metaBlock);
11,376,286✔
3264
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
11,371,794✔
3265
  resultRsp.ver = req->walMetaNewReq.lastVer;
11,371,794✔
3266
  STREAM_CHECK_RET_GOTO(processWalVerMetaNew(pVnode, &resultRsp, sStreamReaderInfo, req->walMetaNewReq.ctime));
11,372,024✔
3267

3268
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
11,373,520✔
3269
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
11,373,983✔
3270
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
279,827✔
3271
  buf = rpcMallocCont(size);
279,827✔
3272
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
279,827✔
3273
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
279,827✔
3274
  printDataBlock(resultRsp.deleteBlock, __func__, "delete", ((SStreamTask*)pTask)->streamId);
279,827✔
3275
  printDataBlock(resultRsp.tableBlock, __func__, "table", ((SStreamTask*)pTask)->streamId);
279,827✔
3276

3277
end:
11,373,983✔
3278
  if (code == 0 && resultRsp.totalRows == 0) {
11,373,520✔
3279
    code = TSDB_CODE_STREAM_NO_DATA;
11,093,230✔
3280
    size = sizeof(int64_t) * 2;
11,093,230✔
3281
    buf = rpcMallocCont(size);
11,093,230✔
3282
    *(int64_t*)buf = resultRsp.ver;
11,091,390✔
3283
    *(((int64_t*)buf) + 1) = resultRsp.verTime;
11,091,850✔
3284
  }
3285
  SRpcMsg rsp = {
11,373,296✔
3286
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3287
  tmsgSendRsp(&rsp);
11,371,475✔
3288
  if (code == TSDB_CODE_STREAM_NO_DATA){
11,375,352✔
3289
    code = 0;
11,095,066✔
3290
  }
3291
  STREAM_PRINT_LOG_END_WITHID(code, lino);
11,375,352✔
3292
  blockDataDestroy(resultRsp.deleteBlock);
11,375,651✔
3293
  blockDataDestroy(resultRsp.tableBlock);
11,376,286✔
3294

3295
  return code;
11,376,053✔
3296
}
3297
static int32_t vnodeProcessStreamWalMetaDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
3,266,743✔
3298
  int32_t      code = 0;
3,266,743✔
3299
  int32_t      lino = 0;
3,266,743✔
3300
  void*        buf = NULL;
3,266,743✔
3301
  size_t       size = 0;
3,266,743✔
3302
  SSTriggerWalNewRsp resultRsp = {0};
3,266,743✔
3303
  
3304
  void* pTask = sStreamReaderInfo->pTask;
3,265,848✔
3305
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaDataNewReq.lastVer);
3,265,847✔
3306

3307
  if (sStreamReaderInfo->metaBlock == NULL) {
3,265,615✔
3308
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
92,162✔
3309
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
92,162✔
3310
  }
3311

3312
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
3,267,467✔
3313
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerBlock, false, (SSDataBlock**)&resultRsp.dataBlock));
3,267,009✔
3314
  resultRsp.ver = req->walMetaDataNewReq.lastVer;
3,265,880✔
3315
  resultRsp.checkAlter = true;
3,266,803✔
3316
  resultRsp.indexHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
3,266,803✔
3317
  STREAM_CHECK_NULL_GOTO(resultRsp.indexHash, terrno);
3,264,571✔
3318

3319
  STREAM_CHECK_RET_GOTO(processWalVerMetaDataNew(pVnode, sStreamReaderInfo, &resultRsp));
3,264,571✔
3320

3321
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
3,266,094✔
3322
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
122,864✔
3323
  buf = rpcMallocCont(size);
123,569✔
3324
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
123,569✔
3325
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
123,569✔
3326
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
123,569✔
3327
  printDataBlock(resultRsp.deleteBlock, __func__, "delete", ((SStreamTask*)pTask)->streamId);
123,569✔
3328
  printDataBlock(resultRsp.tableBlock, __func__, "table", ((SStreamTask*)pTask)->streamId);
123,569✔
3329
  printIndexHash(resultRsp.indexHash, pTask);
123,569✔
3330

3331
end:
3,266,799✔
3332
  if (resultRsp.totalRows == 0) {
3,265,861✔
3333
    code = TSDB_CODE_STREAM_NO_DATA;
3,144,156✔
3334
    size = sizeof(int64_t) * 2;
3,144,156✔
3335
    buf = rpcMallocCont(size);
3,144,156✔
3336
    *(int64_t*)buf = resultRsp.ver;
3,138,379✔
3337
    *(((int64_t*)buf) + 1) = resultRsp.verTime;
3,138,379✔
3338
  }
3339
  SRpcMsg rsp = {
3,262,629✔
3340
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3341
  tmsgSendRsp(&rsp);
3,265,061✔
3342
  if (code == TSDB_CODE_STREAM_NO_DATA){
3,267,961✔
3343
    code = 0;
3,144,392✔
3344
  }
3345
  blockDataDestroy(resultRsp.dataBlock);
3,267,961✔
3346
  blockDataDestroy(resultRsp.deleteBlock);
3,263,764✔
3347
  blockDataDestroy(resultRsp.tableBlock);
3,264,009✔
3348
  tSimpleHashCleanup(resultRsp.indexHash);
3,264,699✔
3349

3350
  STREAM_PRINT_LOG_END_WITHID(code, lino);
3,262,941✔
3351

3352
  return code;
3,263,761✔
3353
}
3354

3355
static int32_t vnodeProcessStreamWalDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
7,431,052✔
3356
  int32_t      code = 0;
7,431,052✔
3357
  int32_t      lino = 0;
7,431,052✔
3358
  void*        buf = NULL;
7,431,052✔
3359
  size_t       size = 0;
7,431,052✔
3360
  SSTriggerWalNewRsp resultRsp = {0};
7,431,052✔
3361

3362
  void* pTask = sStreamReaderInfo->pTask;
7,431,052✔
3363
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
7,431,052✔
3364

3365
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerBlock, false, (SSDataBlock**)&resultRsp.dataBlock));
7,431,052✔
3366
  resultRsp.indexHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
7,431,282✔
3367
  STREAM_CHECK_NULL_GOTO(resultRsp.indexHash, terrno);
7,430,351✔
3368
  resultRsp.uidHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
7,430,351✔
3369
  STREAM_CHECK_NULL_GOTO(resultRsp.uidHash, terrno);
7,431,049✔
3370

3371
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
7,431,049✔
3372
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
7,430,359✔
3373

3374
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
7,430,822✔
3375

3376
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
165,913✔
3377
  buf = rpcMallocCont(size);
165,913✔
3378
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
165,913✔
3379
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
165,913✔
3380
  printIndexHash(resultRsp.indexHash, pTask);
165,913✔
3381

3382
end:
7,430,822✔
3383
  if (resultRsp.totalRows == 0) {
7,430,592✔
3384
    buf = rpcMallocCont(sizeof(int64_t));
7,264,679✔
3385
    *(int64_t *)buf = resultRsp.ver;
7,264,449✔
3386
    size = sizeof(int64_t);
7,264,449✔
3387
    code = TSDB_CODE_STREAM_NO_DATA;
7,264,449✔
3388
  }
3389
  SRpcMsg rsp = {
7,430,362✔
3390
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3391
  tmsgSendRsp(&rsp);
7,430,822✔
3392
  if (code == TSDB_CODE_STREAM_NO_DATA){
7,431,052✔
3393
    code = 0;
7,265,139✔
3394
  }
3395

3396
  blockDataDestroy(resultRsp.dataBlock);
7,431,052✔
3397
  blockDataDestroy(resultRsp.deleteBlock);
7,429,470✔
3398
  blockDataDestroy(resultRsp.tableBlock);
7,429,930✔
3399
  tSimpleHashCleanup(resultRsp.indexHash);
7,430,592✔
3400
  tSimpleHashCleanup(resultRsp.uidHash);
7,429,899✔
3401
  STREAM_PRINT_LOG_END_WITHID(code, lino);
7,430,142✔
3402

3403
  return code;
7,431,223✔
3404
}
3405

3406
static int32_t vnodeProcessStreamWalCalcDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
943,289✔
3407
  int32_t      code = 0;
943,289✔
3408
  int32_t      lino = 0;
943,289✔
3409
  void*        buf = NULL;
943,289✔
3410
  size_t       size = 0;
943,289✔
3411
  SSTriggerWalNewRsp resultRsp = {0};
943,289✔
3412
  SSDataBlock* pBlock1 = NULL;
943,289✔
3413
  SSDataBlock* pBlock2 = NULL;
943,289✔
3414
  
3415
  void* pTask = sStreamReaderInfo->pTask;
943,289✔
3416
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
943,289✔
3417

3418
  SSDataBlock* dataBlock = sStreamReaderInfo->isVtableStream ? sStreamReaderInfo->calcBlock : sStreamReaderInfo->triggerBlock;
943,289✔
3419
  STREAM_CHECK_RET_GOTO(createOneDataBlock(dataBlock, false, (SSDataBlock**)&resultRsp.dataBlock));
943,289✔
3420
  resultRsp.isCalc = sStreamReaderInfo->isVtableStream ? true : false;
943,289✔
3421
  resultRsp.indexHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
943,289✔
3422
  STREAM_CHECK_NULL_GOTO(resultRsp.indexHash, terrno);
943,289✔
3423
  resultRsp.uidHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
943,289✔
3424
  STREAM_CHECK_NULL_GOTO(resultRsp.uidHash, terrno);
943,289✔
3425

3426
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
943,289✔
3427
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
943,289✔
3428

3429
  if (!sStreamReaderInfo->isVtableStream){
229,900✔
3430
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcBlock, false, &pBlock2));
144,015✔
3431
  
3432
    blockDataTransform(pBlock2, resultRsp.dataBlock);
144,015✔
3433
    blockDataDestroy(resultRsp.dataBlock);
144,015✔
3434
    resultRsp.dataBlock = pBlock2;
144,015✔
3435
    pBlock2 = NULL;
144,015✔
3436
  }
3437

3438
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
229,900✔
3439
  buf = rpcMallocCont(size);
229,670✔
3440
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
229,900✔
3441
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
229,900✔
3442
  printIndexHash(resultRsp.indexHash, pTask);
229,665✔
3443

3444
end:
943,054✔
3445
  if (resultRsp.totalRows == 0) {
943,289✔
3446
    buf = rpcMallocCont(sizeof(int64_t));
713,389✔
3447
    *(int64_t *)buf = resultRsp.ver;
713,389✔
3448
    size = sizeof(int64_t);
713,389✔
3449
    code = TSDB_CODE_STREAM_NO_DATA;
713,389✔
3450
  }
3451
  SRpcMsg rsp = {
943,289✔
3452
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3453
  tmsgSendRsp(&rsp);
943,054✔
3454
  if (code == TSDB_CODE_STREAM_NO_DATA){
943,289✔
3455
    code = 0;
713,389✔
3456
  }
3457

3458
  blockDataDestroy(pBlock1);
943,289✔
3459
  blockDataDestroy(pBlock2);
943,054✔
3460
  blockDataDestroy(resultRsp.dataBlock);
943,054✔
3461
  blockDataDestroy(resultRsp.deleteBlock);
943,289✔
3462
  blockDataDestroy(resultRsp.tableBlock);
943,289✔
3463
  tSimpleHashCleanup(resultRsp.indexHash);
943,289✔
3464
  tSimpleHashCleanup(resultRsp.uidHash);
943,289✔
3465
  STREAM_PRINT_LOG_END_WITHID(code, lino);
943,087✔
3466

3467
  return code;
943,087✔
3468
}
3469

3470
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
236,128✔
3471
  int32_t code = 0;
236,128✔
3472
  int32_t lino = 0;
236,128✔
3473
  void*   buf = NULL;
236,128✔
3474
  size_t  size = 0;
236,128✔
3475
  SArray** gInfo = NULL;
236,128✔
3476
  
3477
  void* pTask = sStreamReaderInfo->pTask;
236,128✔
3478
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
236,128✔
3479

3480
  gInfo = taosHashAcquire(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
236,128✔
3481
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
236,128✔
3482
  SStreamGroupInfo pGroupInfo = {0};
236,128✔
3483
  pGroupInfo.gInfo = *gInfo;
236,128✔
3484

3485
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
236,128✔
3486
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
3487
  buf = rpcMallocCont(size);
235,898✔
3488
  STREAM_CHECK_NULL_GOTO(buf, terrno);
236,128✔
3489
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
236,128✔
3490
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
3491
end:
236,128✔
3492
  taosHashRelease(sStreamReaderInfo->groupIdMap, gInfo);
236,128✔
3493
  if (code != 0) {
236,128✔
3494
    rpcFreeCont(buf);
×
3495
    buf = NULL;
×
3496
    size = 0;
×
3497
  }
3498
  STREAM_PRINT_LOG_END_WITHID(code, lino);
236,128✔
3499
  SRpcMsg rsp = {
236,128✔
3500
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3501
  tmsgSendRsp(&rsp);
236,128✔
3502

3503
  return code;
236,128✔
3504
}
3505

3506
static int32_t setVtableInfo(SVnode* pVnode, SArray* infos, SArray* cids, int64_t uid, uint64_t gid, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo) {
163,886✔
3507
  int32_t              code = 0;
163,886✔
3508
  int32_t              lino = 0;
163,886✔
3509
  void* pTask = sStreamReaderInfo->pTask;
163,886✔
3510

3511
  VTableInfo* vTable = taosArrayReserve(infos, 1);
163,886✔
3512
  STREAM_CHECK_NULL_GOTO(vTable, terrno);
163,886✔
3513
  vTable->uid = uid;
163,886✔
3514
  vTable->gId = gid;
163,886✔
3515

3516
  ST_TASK_DLOG("vgId:%d %s put vtable uid:%"PRId64, TD_VID(pVnode), __func__, uid);
163,886✔
3517

3518
  code = sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByVersionUid(metaReader, sStreamReaderInfo->tableList.version, uid);
163,886✔
3519
  if (code != 0) {
163,886✔
3520
    ST_TASK_ELOG("vgId:%d %s get table entry by uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, uid, tstrerror(code));
×
3521
    goto end;
×
3522
  }
3523
  if (atomic_load_8(&sStreamReaderInfo->isVtableOnlyTs) == 1) {
163,886✔
3524
    vTable->cols.nCols = metaReader->me.colRef.nCols;
9,010✔
3525
    vTable->cols.version = metaReader->me.colRef.version;
9,010✔
3526
    vTable->cols.pColRef = taosMemoryCalloc(metaReader->me.colRef.nCols, sizeof(SColRef));
9,010✔
3527
    STREAM_CHECK_NULL_GOTO(vTable->cols.pColRef, terrno);
9,010✔
3528
    for (size_t j = 0; j < metaReader->me.colRef.nCols; j++) {
54,060✔
3529
      memcpy(vTable->cols.pColRef + j, &metaReader->me.colRef.pColRef[j], sizeof(SColRef));
45,050✔
3530
    }
3531
  } else {
3532
    vTable->cols.nCols = taosArrayGetSize(cids);
154,876✔
3533
    vTable->cols.version = metaReader->me.colRef.version;
154,876✔
3534
    vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
154,876✔
3535
    STREAM_CHECK_NULL_GOTO(vTable->cols.pColRef, terrno);
154,876✔
3536
    for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
587,785✔
3537
      for (size_t j = 0; j < metaReader->me.colRef.nCols; j++) {
1,824,026✔
3538
        if (metaReader->me.colRef.pColRef[j].hasRef &&
1,668,460✔
3539
            metaReader->me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
1,231,223✔
3540
          memcpy(vTable->cols.pColRef + i, &metaReader->me.colRef.pColRef[j], sizeof(SColRef));
277,343✔
3541
          break;
277,343✔
3542
        }
3543
      }
3544
    }
3545
  }
3546
  tDecoderClear(&metaReader->coder);
163,886✔
3547

3548
end:
163,886✔
3549
  return code;
163,886✔
3550
}
3551

3552
static int32_t getAllVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SArray* cids, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo){
66,540✔
3553
  int32_t              code = 0;
66,540✔
3554
  int32_t              lino = 0;
66,540✔
3555
  void* pTask = sStreamReaderInfo->pTask;
66,540✔
3556
  SArray*              pTableListArray = NULL;
66,540✔
3557

3558

3559
  pTableListArray = qStreamGetTableArrayList(sStreamReaderInfo);
66,540✔
3560
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
66,540✔
3561

3562
  vTableInfo->infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
66,540✔
3563
  STREAM_CHECK_NULL_GOTO(vTableInfo->infos, terrno);
66,540✔
3564

3565
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
230,426✔
3566
    SStreamTableKeyInfo* pKeyInfo = taosArrayGetP(pTableListArray, i);
163,886✔
3567
    if (pKeyInfo == NULL || pKeyInfo->markedDeleted) {
163,886✔
3568
      continue;
×
3569
    }
3570
    code = setVtableInfo(pVnode, vTableInfo->infos, cids, pKeyInfo->uid, pKeyInfo->groupId, metaReader, sStreamReaderInfo);
163,886✔
3571
    if (code != 0) {
163,886✔
3572
      ST_TASK_WLOG("vgId:%d %s set vtable info uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, pKeyInfo->uid, tstrerror(code));
×
3573
      code = 0;
×
3574
      continue;
×
3575
    }
3576
  }
3577

3578
end:
66,540✔
3579
  taosArrayDestroyP(pTableListArray, taosMemFree);
66,540✔
3580
  return code;
66,540✔
3581
}
3582

3583
static int32_t getSpicificVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SArray* uids, SArray* cids, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo){
×
3584
  int32_t              code = 0;
×
3585
  int32_t              lino = 0;
×
3586
  void* pTask = sStreamReaderInfo->pTask;
×
3587

3588
  vTableInfo->infos = taosArrayInit(taosArrayGetSize(uids), sizeof(VTableInfo));
×
3589
  STREAM_CHECK_NULL_GOTO(vTableInfo->infos, terrno);
×
3590

3591
  for (size_t i = 0; i < taosArrayGetSize(uids); i++) {
×
3592
    int64_t* uid = taosArrayGet(uids, i);
×
3593
    STREAM_CHECK_NULL_GOTO(uid, terrno);
×
3594

3595
    taosRLockLatch(&sStreamReaderInfo->lock);
×
3596
    uint64_t groupId = qStreamGetGroupIdFromOrigin(sStreamReaderInfo, *uid);
×
3597
    taosRUnLockLatch(&sStreamReaderInfo->lock);
×
3598
    if (groupId == -1) {
×
3599
      ST_TASK_WLOG("vgId:%d %s uid:%"PRId64" not found in stream group", TD_VID(pVnode), __func__, *uid);
×
3600
      continue;
×
3601
    }
3602
    code = setVtableInfo(pVnode, vTableInfo->infos, cids, *uid, groupId, metaReader, sStreamReaderInfo);
×
3603
    if (code != 0) {
×
3604
      ST_TASK_WLOG("vgId:%d %s set vtable info uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, *uid, tstrerror(code));
×
3605
      code = 0;
×
3606
      continue;
×
3607
    }
3608
  }
3609
  
3610
end:
×
3611
  return code;
×
3612
}
3613

3614
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
66,540✔
3615
  int32_t              code = 0;
66,540✔
3616
  int32_t              lino = 0;
66,540✔
3617
  void*                buf = NULL;
66,540✔
3618
  size_t               size = 0;
66,540✔
3619
  SStreamMsgVTableInfo vTableInfo = {0};
66,540✔
3620
  SMetaReader          metaReader = {0};
66,540✔
3621

3622
  void* pTask = sStreamReaderInfo->pTask;
66,540✔
3623
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
66,540✔
3624

3625
  SArray* cids = req->virTableInfoReq.cids;
66,540✔
3626
  STREAM_CHECK_NULL_GOTO(cids, terrno);
66,540✔
3627

3628
  if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
66,540✔
3629
    (void)atomic_val_compare_exchange_8(&sStreamReaderInfo->isVtableOnlyTs, 0, 1);
9,010✔
3630
  }
3631
  sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
66,540✔
3632

3633
  if (req->virTableInfoReq.fetchAllTable || req->virTableInfoReq.uids == NULL || taosArrayGetSize(req->virTableInfoReq.uids) == 0) {
66,540✔
3634
    STREAM_CHECK_RET_GOTO(getAllVinfo(pVnode, &vTableInfo, cids, &metaReader, sStreamReaderInfo));
66,540✔
3635
  } else {
3636
    STREAM_CHECK_RET_GOTO(getSpicificVinfo(pVnode, &vTableInfo, req->virTableInfoReq.uids, cids, &metaReader, sStreamReaderInfo));
×
3637
  }
3638
  ST_TASK_DLOG("vgId:%d %s end, size:%"PRIzu, TD_VID(pVnode), __func__, taosArrayGetSize(vTableInfo.infos));
66,540✔
3639
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
66,540✔
3640

3641
end:
66,540✔
3642
  tDestroySStreamMsgVTableInfo(&vTableInfo);
66,540✔
3643
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReader);
66,540✔
3644
  STREAM_PRINT_LOG_END_WITHID(code, lino);
66,540✔
3645
  SRpcMsg rsp = {
66,540✔
3646
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3647
  tmsgSendRsp(&rsp);
66,540✔
3648
  return code;
66,540✔
3649
}
3650

3651
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
116,568✔
3652
  int32_t                   code = 0;
116,568✔
3653
  int32_t                   lino = 0;
116,568✔
3654
  void*                     buf = NULL;
116,568✔
3655
  size_t                    size = 0;
116,568✔
3656
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
116,568✔
3657
  SMetaReader               metaReader = {0};
116,568✔
3658
  void*                     pTask = sStreamReaderInfo->pTask;
116,568✔
3659

3660
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
116,568✔
3661

3662
  SArray* cols = req->origTableInfoReq.cols;
116,568✔
3663
  STREAM_CHECK_NULL_GOTO(cols, terrno);
116,568✔
3664

3665
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
116,568✔
3666

3667
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
116,568✔
3668

3669
  sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
116,568✔
3670
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
429,951✔
3671
    OTableInfo*    oInfo = taosArrayGet(cols, i);
313,181✔
3672
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
313,150✔
3673
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
312,948✔
3674
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
312,948✔
3675
    code = sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName);
312,948✔
3676
    if (code != 0) {
313,181✔
3677
      code = 0;
202✔
3678
      ST_TASK_ELOG("vgId:%d %s get table entry by name:%s failed, msg:%s", TD_VID(pVnode), __func__, oInfo->refTableName, tstrerror(code));
202✔
3679
      continue;
202✔
3680
    }
3681
    vTableInfo->uid = metaReader.me.uid;
312,979✔
3682
    ST_TASK_DLOG("vgId:%d %s get original uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
312,979✔
3683

3684
    SSchemaWrapper* sSchemaWrapper = NULL;
313,181✔
3685
    if (metaReader.me.type == TD_CHILD_TABLE) {
313,181✔
3686
      int64_t suid = metaReader.me.ctbEntry.suid;
311,350✔
3687
      vTableInfo->suid = suid;
311,350✔
3688
      tDecoderClear(&metaReader.coder);
311,350✔
3689
      STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReader, suid));
311,350✔
3690
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
311,148✔
3691
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
1,831✔
3692
      vTableInfo->suid = 0;
1,831✔
3693
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
1,831✔
3694
    } else {
3695
      ST_TASK_ELOG("invalid table type:%d", metaReader.me.type);
×
3696
    }
3697

3698
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
1,070,169✔
3699
      SSchema* s = sSchemaWrapper->pSchema + j;
1,070,573✔
3700
      if (strcmp(s->name, oInfo->refColName) == 0) {
1,069,437✔
3701
        vTableInfo->cid = s->colId;
312,948✔
3702
        break;
312,480✔
3703
      }
3704
    }
3705
    if (vTableInfo->cid == 0) {
311,414✔
3706
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
3707
              oInfo->refTableName);
3708
    }
3709
    tDecoderClear(&metaReader.coder);
312,480✔
3710
  }
3711

3712
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
116,770✔
3713

3714
end:
116,568✔
3715
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
116,568✔
3716
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReader);
116,568✔
3717
  STREAM_PRINT_LOG_END_WITHID(code, lino);
116,568✔
3718
  SRpcMsg rsp = {
116,568✔
3719
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3720
  tmsgSendRsp(&rsp);
116,568✔
3721
  return code;
116,568✔
3722
}
3723

3724
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
684,516✔
3725
  int32_t                   code = 0;
684,516✔
3726
  int32_t                   lino = 0;
684,516✔
3727
  void*                     buf = NULL;
684,516✔
3728
  size_t                    size = 0;
684,516✔
3729
  SSDataBlock* pBlock = NULL;
684,516✔
3730

3731
  SMetaReader               metaReader = {0};
684,516✔
3732
  SMetaReader               metaReaderStable = {0};
684,516✔
3733
  int64_t streamId = req->base.streamId;
684,516✔
3734
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
684,516✔
3735

3736
  SArray* cols = req->virTablePseudoColReq.cids;
684,516✔
3737
  STREAM_CHECK_NULL_GOTO(cols, terrno);
684,516✔
3738

3739
  sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
684,516✔
3740
  STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
684,516✔
3741

3742
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
684,283✔
3743

3744
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
684,283✔
3745
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
684,283✔
3746
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 || *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
2,070✔
3747
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
2,070✔
3748
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
2,070✔
3749
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
2,070✔
3750
    pBlock->info.rows = 1;
2,070✔
3751
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
2,070✔
3752
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
2,070✔
3753
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
2,070✔
3754
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
682,213✔
3755
    int64_t suid = metaReader.me.ctbEntry.suid;
682,446✔
3756
    sStreamReaderInfo->storageApi.metaReaderFn.readerReleaseLock(&metaReader);
682,446✔
3757
    sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
682,446✔
3758

3759
    STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
682,446✔
3760
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
682,216✔
3761
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
1,729,653✔
3762
      col_id_t* id = taosArrayGet(cols, i);
1,047,204✔
3763
      STREAM_CHECK_NULL_GOTO(id, terrno);
1,047,437✔
3764
      if (*id == -1) {
1,047,437✔
3765
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
671,854✔
3766
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
671,854✔
3767
        continue;
671,854✔
3768
      }
3769
      size_t j = 0;
375,583✔
3770
      for (; j < sSchemaWrapper->nCols; j++) {
706,725✔
3771
        SSchema* s = sSchemaWrapper->pSchema + j;
706,725✔
3772
        if (s->colId == *id) {
706,492✔
3773
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
375,350✔
3774
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
375,583✔
3775
          break;
375,583✔
3776
        }
3777
      }
3778
      if (j == sSchemaWrapper->nCols) {
375,583✔
3779
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
3780
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
3781
      }
3782
    }
3783
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
682,446✔
3784
    pBlock->info.rows = 1;
682,446✔
3785
    
3786
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
1,728,755✔
3787
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,047,437✔
3788
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
1,047,204✔
3789

3790
      if (pDst->info.colId == -1) {
1,047,204✔
3791
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
671,854✔
3792
        continue;
670,959✔
3793
      }
3794
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
375,583✔
3795
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
3796
        continue;
×
3797
      }
3798

3799
      STagVal val = {0};
375,583✔
3800
      val.cid = pDst->info.colId;
375,583✔
3801
      const char* p = sStreamReaderInfo->storageApi.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
375,350✔
3802

3803
      char* data = NULL;
375,583✔
3804
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
375,583✔
3805
        data = tTagValToData((const STagVal*)p, false);
375,583✔
3806
      } else {
3807
        data = (char*)p;
×
3808
      }
3809

3810
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
375,350✔
3811
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
3812

3813
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
375,350✔
3814
          (data != NULL)) {
3815
        taosMemoryFree(data);
326,902✔
3816
      }
3817
    }
3818
  } else {
3819
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
3820
    code = TSDB_CODE_INVALID_PARA;
×
3821
    goto end;
×
3822
  }
3823
  
3824
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
683,854✔
3825
  printDataBlock(pBlock, __func__, "", streamId);
684,516✔
3826
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
684,516✔
3827

3828
end:
684,516✔
3829
  if(size == 0){
684,286✔
3830
    code = TSDB_CODE_STREAM_NO_DATA;
×
3831
  }
3832
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReaderStable);
684,286✔
3833
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReader);
684,516✔
3834
  STREAM_PRINT_LOG_END(code, lino);
684,516✔
3835
  SRpcMsg rsp = {
684,516✔
3836
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3837
  tmsgSendRsp(&rsp);
684,286✔
3838
  blockDataDestroy(pBlock);
684,516✔
3839
  return code;
684,516✔
3840
}
3841

3842
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *pInfo) {
3,594,251✔
3843
  int32_t            code = 0;
3,594,251✔
3844
  int32_t            lino = 0;
3,594,251✔
3845
  void*              buf = NULL;
3,594,251✔
3846
  size_t             size = 0;
3,594,251✔
3847
  void*              taskAddr = NULL;
3,594,251✔
3848
  SArray*            pResList = NULL;
3,594,251✔
3849
  bool               hasNext = false;
3,594,251✔
3850

3851
  SResFetchReq req = {0};
3,594,251✔
3852
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
3,594,251✔
3853
                              TSDB_CODE_QRY_INVALID_INPUT);
3854
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
3,593,986✔
3855
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
3,594,251✔
3856

3857
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
3,594,251✔
3858
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
3,594,251✔
3859
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
3,594,251✔
3860
  sStreamReaderCalcInfo->rtInfo.execId = req.execId;
3,594,251✔
3861

3862
  void* pTask = sStreamReaderCalcInfo->pTask;
3,594,251✔
3863
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
3,594,251✔
3864
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
3865

3866
  if (req.reset) {
3,594,251✔
3867
    int64_t uid = 0;
3,419,251✔
3868
    if (req.dynTbname) {
3,419,251✔
3869
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
26,948✔
3870
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
26,948✔
3871
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
26,948✔
3872
        if (pValue != NULL && pValue->isTbname) {
26,948✔
3873
          uid = pValue->uid;
26,948✔
3874
          break;
26,948✔
3875
        }
3876
      }
3877
    }
3878
    
3879
    SReadHandle handle = {0};
3,419,251✔
3880
    handle.vnode = pVnode;
3,419,251✔
3881
    handle.pMsgCb = &pVnode->msgCb;
3,419,251✔
3882
    handle.pWorkerCb = pInfo->workerCb;
3,419,251✔
3883
    handle.uid = uid;
3,419,251✔
3884
    handle.cacheSttStatis = true;
3,419,251✔
3885

3886
    initStorageAPI(&handle.api);
3,419,251✔
3887
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
3,419,251✔
3888
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
2,961,704✔
3889
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
2,668,777✔
3890
      if (node != NULL) {
2,669,007✔
3891
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, false));
331,940✔
3892
      } else {
3893
        ST_TASK_DLOG("vgId:%d %s no scan time range node", TD_VID(pVnode), __func__);
2,337,067✔
3894
      }
3895

3896
      node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pExtTimeRange;
2,669,007✔
3897
      if (node != NULL) {
2,669,007✔
3898
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, true));
75,614✔
3899
      } else {
3900
        ST_TASK_DLOG("vgId:%d %s no interp time range node", TD_VID(pVnode), __func__);
2,593,393✔
3901
      }      
3902
    }
3903

3904
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
3,419,251✔
3905
    sStreamReaderCalcInfo->rtInfo.funcInfo.hasPlaceHolder = sStreamReaderCalcInfo->hasPlaceHolder;
3,419,251✔
3906
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
3,419,251✔
3907

3908
    if (sStreamReaderCalcInfo->pTaskInfo == NULL || !qNeedReset(sStreamReaderCalcInfo->pTaskInfo)) {
3,419,251✔
3909
      qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
379,337✔
3910
      STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
379,337✔
3911
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
3912
                                                    req.taskId));
3913
    } else {
3914
      STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, &handle));
3,039,914✔
3915
    }
3916

3917
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
3,419,251✔
3918
  }
3919

3920
  if (req.pOpParam != NULL) {
3,593,990✔
3921
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, (void*)req.pOpParam);
288,564✔
3922
  }
3923

3924
  pResList = taosArrayInit(4, POINTER_BYTES);
3,593,990✔
3925
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
3,593,990✔
3926
  uint64_t ts = 0;
3,593,990✔
3927
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
3,594,251✔
3928

3929
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
8,124,743✔
3930
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
4,531,872✔
3931
    if (pBlock == NULL) continue;
4,531,872✔
3932
    printDataBlock(pBlock, __func__, "fetch", ((SStreamTask*)pTask)->streamId);
4,531,872✔
3933
/*    
3934
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
3935
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
3936
      printDataBlock(pBlock, __func__, "fetch filter");
3937
    }
3938
*/    
3939
  }
3940

3941
end:
3,594,021✔
3942
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
3,594,021✔
3943
  taosArrayDestroy(pResList);
3,594,021✔
3944
  streamReleaseTask(taskAddr);
3,594,251✔
3945

3946
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST){
3,594,251✔
3947
    code = TDB_CODE_SUCCESS;
×
3948
  }
3949
  STREAM_PRINT_LOG_END(code, lino);
3,594,251✔
3950
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3,594,251✔
3951
  tmsgSendRsp(&rsp);
3,594,021✔
3952
  tDestroySResFetchReq(&req);
3,594,251✔
3953
  if (TDB_CODE_SUCCESS != code) {
3,593,761✔
3954
    ST_TASK_ELOG("vgId:%d %s failed, code:%d - %s", TD_VID(pVnode), __func__,
×
3955
                 code, tstrerror(code));
3956
  }
3957
  return code;
3,593,761✔
3958
}
3959

3960
static int32_t initTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SVnode* pVnode) {
31,955,991✔
3961
  int32_t code = 0;
31,955,991✔
3962
  if (sStreamReaderInfo->tableList.pTableList != NULL) {  
31,955,991✔
3963
    return code;
31,691,244✔
3964
  }
3965
  taosWLockLatch(&sStreamReaderInfo->lock);
266,537✔
3966
  sStreamReaderInfo->pVnode = pVnode;
266,537✔
3967
  initStorageAPI(&sStreamReaderInfo->storageApi);
266,537✔
3968
  if (sStreamReaderInfo->tableList.pTableList == NULL) {
266,081✔
3969
    code = initStreamTableListInfo(&sStreamReaderInfo->tableList);
266,081✔
3970
    if (code == 0) {
266,537✔
3971
      code = generateTablistForStreamReader(pVnode, sStreamReaderInfo);
266,537✔
3972
      if (code != 0) {
266,537✔
3973
        qStreamDestroyTableInfo(&sStreamReaderInfo->tableList);
×
3974
      } else {
3975
        sStreamReaderInfo->tableList.version = pVnode->state.applied;
266,537✔
3976
        stDebug("vgId:%d %s init table list for stream reader, table num:%zu, version:%" PRId64,
266,537✔
3977
                TD_VID(pVnode), __func__, taosArrayGetSize(sStreamReaderInfo->tableList.pTableList), sStreamReaderInfo->tableList.version);
3978
      }
3979
    }
3980
  }
3981
  taosWUnLockLatch(&sStreamReaderInfo->lock);
266,537✔
3982
  return code;
266,537✔
3983
}
3984

3985
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *pInfo) {
35,615,017✔
3986
  int32_t                   code = 0;
35,615,017✔
3987
  int32_t                   lino = 0;
35,615,017✔
3988
  SSTriggerPullRequestUnion req = {0};
35,615,017✔
3989
  void*                     taskAddr = NULL;
35,615,192✔
3990
  bool                      sendRsp = false;
35,611,017✔
3991

3992
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
35,611,017✔
3993
  if (!syncIsReadyForRead(pVnode->sync)) {
35,612,384✔
3994
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
42,077✔
3995
    return 0;
42,077✔
3996
  }
3997

3998
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
35,572,250✔
3999
    return vnodeProcessStreamFetchMsg(pVnode, pMsg, pInfo);
3,594,251✔
4000
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
31,978,201✔
4001
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
31,979,354✔
4002
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
31,978,664✔
4003
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
31,979,124✔
4004
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64 ", applied:%" PRIx64,
31,975,703✔
4005
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId, pVnode->state.applied);
4006
    SStreamTriggerReaderInfo* sStreamReaderInfo = qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
31,976,903✔
4007
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
31,977,502✔
4008
    STREAM_CHECK_RET_GOTO(initTableList(sStreamReaderInfo, pVnode));
31,956,426✔
4009
    sendRsp = true;
31,958,014✔
4010
    switch (req.base.type) {
31,958,014✔
4011
      case STRIGGER_PULL_SET_TABLE:
116,568✔
4012
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo));
116,568✔
4013
        break;
116,568✔
4014
      case STRIGGER_PULL_LAST_TS:
258,836✔
4015
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
258,836✔
4016
        break;
258,836✔
4017
      case STRIGGER_PULL_FIRST_TS:
206,877✔
4018
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
206,877✔
4019
        break;
206,877✔
4020
      case STRIGGER_PULL_TSDB_META:
510,530✔
4021
      case STRIGGER_PULL_TSDB_META_NEXT:
4022
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo));
510,530✔
4023
        break;
510,530✔
4024
      case STRIGGER_PULL_TSDB_TS_DATA:
63,884✔
4025
        if (sStreamReaderInfo->isVtableStream) {
63,884✔
4026
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo));
×
4027
        } else {
4028
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo));
63,884✔
4029
        }
4030
        break;
63,884✔
4031
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
111,140✔
4032
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
4033
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
111,140✔
4034
        break;
55,570✔
4035
      case STRIGGER_PULL_TSDB_CALC_DATA:
6,220,618✔
4036
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
4037
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
6,220,618✔
4038
        break;
6,220,208✔
4039
      case STRIGGER_PULL_TSDB_DATA:
347,460✔
4040
      case STRIGGER_PULL_TSDB_DATA_NEXT:
4041
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
347,460✔
4042
        break;
345,474✔
4043
      case STRIGGER_PULL_GROUP_COL_VALUE:
236,128✔
4044
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo));
236,128✔
4045
        break;
236,128✔
4046
      case STRIGGER_PULL_VTABLE_INFO:
66,540✔
4047
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
66,540✔
4048
        break;
66,540✔
4049
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
684,516✔
4050
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
684,516✔
4051
        break;
684,516✔
4052
      case STRIGGER_PULL_OTABLE_INFO:
116,568✔
4053
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
116,568✔
4054
        break;
116,568✔
4055
      case STRIGGER_PULL_WAL_META_NEW:
11,373,206✔
4056
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
11,373,206✔
4057
        break;
11,375,593✔
4058
      case STRIGGER_PULL_WAL_DATA_NEW:
7,430,387✔
4059
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
7,430,387✔
4060
        break;
7,430,589✔
4061
      case STRIGGER_PULL_WAL_META_DATA_NEW:
3,267,498✔
4062
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
3,267,498✔
4063
        break;
3,266,345✔
4064
      case STRIGGER_PULL_WAL_CALC_DATA_NEW:
943,289✔
4065
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
943,289✔
4066
        break;
943,289✔
4067
      default:
3,969✔
4068
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
3,969✔
UNCOV
4069
        sendRsp = false;
×
UNCOV
4070
        STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
4071
    }
4072
  } else {
4073
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
4074
    STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
4075
  }
4076
end:
31,966,627✔
4077

4078
  streamReleaseTask(taskAddr);
31,976,122✔
4079

4080
  tDestroySTriggerPullRequest(&req);
31,978,630✔
4081
  STREAM_PRINT_LOG_END(code, lino);
31,968,457✔
4082
  if (!sendRsp) {
31,974,033✔
4083
    SRpcMsg rsp = {
42,152✔
4084
      .code = code,
4085
      .pCont = pMsg->info.rsp,
21,076✔
4086
      .contLen = pMsg->info.rspLen,
21,076✔
4087
      .info = pMsg->info,
4088
    };
4089
    tmsgSendRsp(&rsp);
21,076✔
4090
  }
4091
  return code;
31,974,033✔
4092
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc