• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4995

18 Mar 2026 06:26AM UTC coverage: 71.996% (-0.2%) from 72.244%
#4995

push

travis-ci

web-flow
merge: from main to 3.0 branch #34835

2 of 4 new or added lines in 2 files covered. (50.0%)

5312 existing lines in 167 files now uncovered.

244665 of 339830 relevant lines covered (72.0%)

135013613.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.46
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdbool.h>
17
#include <stdint.h>
18
#include <taos.h>
19
#include <tdef.h>
20
#include "executor.h"
21
#include "nodes.h"
22
#include "osMemPool.h"
23
#include "osMemory.h"
24
#include "scalar.h"
25
#include "stream.h"
26
#include "streamReader.h"
27
#include "taosdef.h"
28
#include "taoserror.h"
29
#include "tarray.h"
30
#include "tcommon.h"
31
#include "tdatablock.h"
32
#include "tdb.h"
33
#include "tdef.h"
34
#include "tencode.h"
35
#include "tglobal.h"
36
#include "thash.h"
37
#include "tlist.h"
38
#include "tlockfree.h"
39
#include "tmsg.h"
40
#include "tsimplehash.h"
41
#include "ttypes.h"
42
#include "vnd.h"
43
#include "vnode.h"
44
#include "vnodeInt.h"
45
#include "executor.h"
46

47
int32_t cacheTag(SVnode* pVnode, SHashObj* metaCache, SExprInfo* pExprInfo, int32_t numOfExpr, SStorageAPI* api, uint64_t uid, col_id_t colId, SRWLatch* lock);
48

49
#define BUILD_OPTION(options, _suid, _ver, _order, startTime, endTime, _schemas, _isSchema, _pSlotList)      \
50
  SStreamOptions                       options = {.suid = _suid,                                                   \
51
                                                  .ver = _ver,                                                     \
52
                                                  .order = _order,                                                 \
53
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
54
                                                  .schemas = _schemas,                                             \
55
                                                  .isSchema = _isSchema,                                           \
56
                                                  .pSlotList = _pSlotList};
57

58
typedef struct WalMetaResult {
59
  uint64_t    id;
60
  int64_t     skey;
61
  int64_t     ekey;
62
} WalMetaResult;
63

64
static int64_t getSuid(SStreamTriggerReaderInfo* sStreamReaderInfo, STableKeyInfo* pList) {
6,720,990✔
65
  int64_t suid = 0;
6,720,990✔
66
  if (!sStreamReaderInfo->isVtableStream) {
6,720,990✔
67
    suid = sStreamReaderInfo->suid;
6,347,552✔
68
    goto end;
6,347,124✔
69
  }
70

71
  if (pList == NULL) {
373,850✔
72
    goto end;
×
73
  }
74

75
  taosRLockLatch(&sStreamReaderInfo->lock);
373,850✔
76
  SStreamTableMapElement* element = taosHashGet(sStreamReaderInfo->vSetTableList.uIdMap, &pList->uid, LONG_BYTES);  
373,850✔
77
  if (element != 0) {
373,850✔
78
    suid = element->table->groupId;
215,461✔
79
    taosRUnLockLatch(&sStreamReaderInfo->lock);
215,461✔
80
    goto end;
215,461✔
81
  }
82
  taosRUnLockLatch(&sStreamReaderInfo->lock);
158,389✔
83

84
end:
6,720,990✔
85
  return suid;
6,720,990✔
86
}
87

88
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
6,675,143✔
89

90
int32_t sortCid(const void *lp, const void *rp) {
1,721,056✔
91
  int16_t* c1 = (int16_t*)lp;
1,721,056✔
92
  int16_t* c2 = (int16_t*)rp;
1,721,056✔
93

94
  if (*c1 < *c2) {
1,721,056✔
95
    return -1;
1,711,252✔
96
  } else if (*c1 > *c2) {
9,804✔
97
    return 1;
9,804✔
98
  }
99

100
  return 0;
×
101
}
102

103
int32_t sortSSchema(const void *lp, const void *rp) {
1,706,056✔
104
  SSchema* c1 = (SSchema*)lp;
1,706,056✔
105
  SSchema* c2 = (SSchema*)rp;
1,706,056✔
106

107
  if (c1->colId < c2->colId) {
1,706,056✔
108
    return -1;
1,696,252✔
109
  } else if (c1->colId > c2->colId) {
9,804✔
110
    return 1;
9,804✔
111
  }
112

113
  return 0;
×
114
}
115

116
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
40,059,202✔
117
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
40,059,202✔
118
  if (pSrc == NULL) {
40,063,055✔
119
    return terrno;
×
120
  }
121

122
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
40,063,055✔
123
  return 0;
40,060,817✔
124
}
125

126
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
8,846,384✔
127
  int32_t code = pTask->storageApi->tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
8,846,384✔
128
  if (code != TSDB_CODE_SUCCESS) {
8,844,797✔
129
    pTask->storageApi->tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
130
  }
131

132
  return code;
8,844,194✔
133
}
134

135
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
1,372,341✔
136
  return pTask->storageApi->tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes);
1,372,341✔
137
}
138

139
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
128,361✔
140
  int32_t code = 0;
128,361✔
141
  int32_t lino = 0;
128,361✔
142
  void*   buf = NULL;
128,361✔
143
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
128,361✔
144
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
128,361✔
145
  buf = rpcMallocCont(len);
128,361✔
146
  STREAM_CHECK_NULL_GOTO(buf, terrno);
128,361✔
147
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
128,361✔
148
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
128,361✔
149
  *data = buf;
128,361✔
150
  *size = len;
128,361✔
151
  buf = NULL;
128,361✔
152
end:
128,361✔
153
  rpcFreeCont(buf);
128,361✔
154
  return code;
128,361✔
155
}
156

157
static bool ignoreMetaChange(int64_t tableListVer, int64_t ver) {
140,307✔
158
  stDebug("%s tableListVer:%" PRId64 " ver:%" PRId64, __func__, tableListVer, ver);
140,307✔
159
  return tableListVer >= ver;
140,307✔
160
}
161

162
static bool needReLoadTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int8_t tableType, int64_t suid, int64_t uid, bool isCalc){
5,138,041✔
163
  if ((tableType == TD_CHILD_TABLE || tableType == TD_VIRTUAL_CHILD_TABLE) &&
5,138,041✔
164
      sStreamReaderInfo->tableType == TD_SUPER_TABLE && 
2,199,711✔
165
      suid == sStreamReaderInfo->suid) {
782,977✔
166
    taosRLockLatch(&sStreamReaderInfo->lock);
10,334✔
167
    uint64_t gid = qStreamGetGroupIdFromOrigin(sStreamReaderInfo, uid);
10,334✔
168
    taosRUnLockLatch(&sStreamReaderInfo->lock);
10,334✔
169
    if (gid == (uint64_t)-1) return true;
10,334✔
170
  }
171
  return false;
5,128,217✔
172
}
173

174
static bool uidInTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id){
11,894,502✔
175
  int32_t  ret = false;
11,894,502✔
176
  if (sStreamReaderInfo->tableType == TD_SUPER_TABLE) {
11,894,502✔
177
    if (suid != sStreamReaderInfo->suid) goto end;
7,026,756✔
178
    if (qStreamGetTableListNum(sStreamReaderInfo) == 0) goto end;
3,391,627✔
179
  } 
180
  *id = qStreamGetGroupIdFromOrigin(sStreamReaderInfo, uid);
8,255,422✔
181
  if (*id == -1) goto end;
8,255,620✔
182
  ret = true;
5,261,819✔
183

184
end:
11,896,918✔
185
  stTrace("%s ret:%d %p %p check suid:%" PRId64 " uid:%" PRId64 " gid:%"PRIu64, __func__, ret, sStreamReaderInfo, sStreamReaderInfo->tableList.gIdMap, suid, uid, *id);
11,896,918✔
186
  return ret;
11,894,031✔
187
}
188

189
static bool uidInTableListOrigin(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id) {
35,173✔
190
  return uidInTableList(sStreamReaderInfo, suid, uid, id);
35,173✔
191
}
192

193
static bool uidInTableListSet(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id, bool isCalc) {
55,972,559✔
194
  bool ret = false;
55,972,559✔
195
  taosRLockLatch(&sStreamReaderInfo->lock);
55,972,559✔
196
  if (sStreamReaderInfo->isVtableStream) {
55,977,810✔
197
    int64_t tmp[2] = {suid, uid};
44,117,123✔
198
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, tmp, sizeof(tmp)) != NULL) {
44,117,123✔
199
      *id = uid;
14,394,308✔
200
      ret = true;
14,394,308✔
201
    }
202
  } else {
203
    ret = uidInTableList(sStreamReaderInfo, suid, uid, id);
11,859,470✔
204
  }
205

206
end:
55,973,202✔
207
  taosRUnLockLatch(&sStreamReaderInfo->lock);
55,973,202✔
208
  return ret;
55,974,925✔
209
}
210

211
static int32_t  qTransformStreamTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, void* pTableListInfo, StreamTableListInfo* tableInfo){
275,609✔
212
  SArray* pList = qStreamGetTableListArray(pTableListInfo);
275,609✔
213
  int32_t totalSize = taosArrayGetSize(pList);
275,609✔
214
  int32_t code = 0;
275,609✔
215
  void* pTask = sStreamReaderInfo->pTask;
275,609✔
216
  for (int32_t i = 0; i < totalSize; ++i) {
738,407✔
217
    STableKeyInfo* info = taosArrayGet(pList, i);
462,798✔
218
    if (info == NULL) {
462,798✔
219
      continue;
×
220
    }
221
    code = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheTrigger, sStreamReaderInfo->pExprInfoTriggerTag, sStreamReaderInfo->numOfExprTriggerTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
462,798✔
222
    if (code != 0){
462,613✔
223
      ST_TASK_WLOG("%s cacheTag trigger failed for uid:%" PRId64",code:%d", __func__, info->uid, code);
×
224
      continue;
×
225
    }
226
    code = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheCalc, sStreamReaderInfo->pExprInfoCalcTag, sStreamReaderInfo->numOfExprCalcTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
462,613✔
227
    if (code != 0){
462,386✔
228
      ST_TASK_WLOG("%s cacheTag calc failed for uid:%" PRId64",code:%d", __func__, info->uid, code);
×
229
      continue;
×
230
    }
231
    code = qStreamSetTableList(tableInfo, info->uid, info->groupId);
462,386✔
232
    if (code != 0){
462,798✔
233
      return code;
×
234
    }
235
  }
236
  return 0;
275,609✔
237
}
238

239
static int32_t generateTablistForStreamReader(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo) {
275,609✔
240
  int32_t                   code = 0;
275,609✔
241
  int32_t                   lino = 0;
275,609✔
242
  SNodeList* groupNew = NULL;   
275,609✔
243
  void* pTableListInfo = NULL;
275,609✔
244

245
  
246
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
275,609✔
247

248
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType, groupNew,
275,609✔
249
                                         true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &sStreamReaderInfo->storageApi, 
250
                                         &pTableListInfo, sStreamReaderInfo->groupIdMap));
251
  
252
  STREAM_CHECK_RET_GOTO(qTransformStreamTableList(sStreamReaderInfo, pTableListInfo, &sStreamReaderInfo->tableList));
275,609✔
253
  
254
  void* pTask = sStreamReaderInfo->pTask;
275,609✔
255
  ST_TASK_DLOG("vgId:%d %s tablelist size:%" PRIzu, TD_VID(pVnode), __func__, taosArrayGetSize(sStreamReaderInfo->tableList.pTableList));
275,609✔
256
end:
275,209✔
257
  nodesDestroyList(groupNew);
275,609✔
258
  qStreamDestroyTableList(pTableListInfo);
275,609✔
259
  STREAM_PRINT_LOG_END(code, lino);
275,609✔
260
  return code;
275,609✔
261
}
262

263
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
72,088✔
264
  int32_t code = 0;
72,088✔
265
  int32_t lino = 0;
72,088✔
266
  void*   buf = NULL;
72,088✔
267
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
72,088✔
268
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
72,088✔
269
  buf = rpcMallocCont(len);
72,088✔
270
  STREAM_CHECK_NULL_GOTO(buf, terrno);
72,088✔
271
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
72,088✔
272
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
72,088✔
273
  *data = buf;
72,088✔
274
  *size = len;
72,088✔
275
  buf = NULL;
72,088✔
276
end:
72,088✔
277
  rpcFreeCont(buf);
72,088✔
278
  return code;
72,088✔
279
}
280

281
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
485,456✔
282
  int32_t code = 0;
485,456✔
283
  int32_t lino = 0;
485,456✔
284
  void*   buf = NULL;
485,456✔
285
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
485,456✔
286
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
484,528✔
287
  buf = rpcMallocCont(len);
484,528✔
288
  STREAM_CHECK_NULL_GOTO(buf, terrno);
485,229✔
289
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
485,229✔
290
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
485,229✔
291
  *data = buf;
485,229✔
292
  *size = len;
485,229✔
293
  buf = NULL;
484,862✔
294
end:
484,862✔
295
  rpcFreeCont(buf);
484,862✔
296
  return code;
485,229✔
297
}
298

299

300
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
7,926,522✔
301
  int32_t code = 0;
7,926,522✔
302
  int32_t lino = 0;
7,926,522✔
303
  void*   buf = NULL;
7,926,522✔
304
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
7,926,522✔
305
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
1,952,492✔
306
  buf = rpcMallocCont(dataEncodeSize);
1,952,492✔
307
  STREAM_CHECK_NULL_GOTO(buf, terrno);
1,952,505✔
308
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
1,952,505✔
309
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
1,952,732✔
310
  *data = buf;
1,952,732✔
311
  *size = dataEncodeSize;
1,952,732✔
312
  buf = NULL;
1,952,505✔
313
end:
7,932,744✔
314
  rpcFreeCont(buf);
7,932,744✔
315
  return code;
7,929,158✔
316
}
317

318
static int32_t buildArrayRsp(SArray* pBlockList, void** data, size_t* size) {
54,111✔
319
  int32_t code = 0;
54,111✔
320
  int32_t lino = 0;
54,111✔
321

322
  void*   buf = NULL;
54,111✔
323

324
  int32_t blockNum = 0;
54,111✔
325
  size_t  dataEncodeBufSize = 0;
54,111✔
326
  for(size_t i = 0; i < taosArrayGetSize(pBlockList); i++){
113,251✔
327
    SSDataBlock* pBlock = taosArrayGetP(pBlockList, i);
59,140✔
328
    if (pBlock == NULL || pBlock->info.rows == 0) continue;
59,140✔
329
    int32_t blockSize = blockGetEncodeSize(pBlock);
59,140✔
330
    dataEncodeBufSize += blockSize;
59,140✔
331
    blockNum++;
59,140✔
332
  }
333
  buf = rpcMallocCont(INT_BYTES + dataEncodeBufSize);
54,111✔
334
  STREAM_CHECK_NULL_GOTO(buf, terrno);
54,111✔
335

336
  char* dataBuf = (char*)buf;
54,111✔
337
  *((int32_t*)(dataBuf)) = blockNum;
54,111✔
338
  dataBuf += INT_BYTES;
54,111✔
339
  for(size_t i = 0; i < taosArrayGetSize(pBlockList); i++){
113,029✔
340
    SSDataBlock* pBlock = taosArrayGetP(pBlockList, i);
59,140✔
341
    if (pBlock == NULL || pBlock->info.rows == 0) continue;
59,140✔
342
    int32_t actualLen = blockEncode(pBlock, dataBuf, dataEncodeBufSize, taosArrayGetSize(pBlock->pDataBlock));
59,140✔
343
    STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
58,918✔
344
    dataBuf += actualLen;
58,918✔
345
  }
346
  *data = buf;
53,889✔
347
  *size = INT_BYTES + dataEncodeBufSize;
53,889✔
348
  buf = NULL;
53,889✔
349
end:
53,889✔
350
  rpcFreeCont(buf);
53,889✔
351
  return code;
53,889✔
352
}
353

354
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t id, bool isVTable, int64_t uid,
×
355
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
356
  int32_t code = 0;
×
357
  int32_t lino = 0;
×
358
  int32_t index = 0;
×
359
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
×
360
  if (!isVTable) {
×
361
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
×
362
  }
363
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
×
364
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
×
365
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
×
366
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
×
367
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
×
368

369
end:
×
370
  // STREAM_PRINT_LOG_END(code, lino)
371
  return code;
×
372
}
373

374
static int32_t buildWalMetaBlockNew(SSDataBlock* pBlock, int64_t id, int64_t skey, int64_t ekey, int64_t ver) {
9,672,600✔
375
  int32_t code = 0;
9,672,600✔
376
  int32_t lino = 0;
9,672,600✔
377
  int32_t index = 0;
9,672,600✔
378
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
9,672,600✔
379
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
9,673,705✔
380
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
9,674,155✔
381
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
9,673,755✔
382

383
end:
9,673,486✔
384
  return code;
9,673,486✔
385
}
386

387
static int32_t buildTableBlock(SSDataBlock* pBlock, int64_t id, int64_t ver, ETableBlockType type) {
1,965✔
388
  int32_t code = 0;
1,965✔
389
  int32_t lino = 0;
1,965✔
390
  int32_t index = 0;
1,965✔
391
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
1,965✔
392
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
1,965✔
393
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
1,965✔
394

395
end:
1,965✔
396
  return code;
1,965✔
397
}
398

399
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
×
400
  pTSchema->numOfCols = 1;
×
401
  pTSchema->version = ver;
×
402
  pTSchema->columns[0].colId = colId;
×
403
  pTSchema->columns[0].type = type;
×
404
  pTSchema->columns[0].bytes = bytes;
×
405
}
×
406

407
static int32_t scanDeleteDataNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
29,172✔
408
                              int64_t ver) {
409
  int32_t    code = 0;
29,172✔
410
  int32_t    lino = 0;
29,172✔
411
  SDecoder   decoder = {0};
29,172✔
412
  SDeleteRes req = {0};
29,172✔
413
  void* pTask = sStreamReaderInfo->pTask;
29,172✔
414

415
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
29,172✔
416
  tDecoderInit(&decoder, data, len);
29,172✔
417
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
29,172✔
418
  STREAM_CHECK_CONDITION_GOTO((sStreamReaderInfo->tableType == TSDB_SUPER_TABLE && !sStreamReaderInfo->isVtableStream && req.suid != sStreamReaderInfo->suid), TDB_CODE_SUCCESS);
29,172✔
419
  
420
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
45,214✔
421
    uint64_t* uid = taosArrayGet(req.uidList, i);
25,947✔
422
    STREAM_CHECK_NULL_GOTO(uid, terrno);
25,947✔
423
    uint64_t   id = 0;
25,947✔
424
    ST_TASK_DLOG("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, *uid, req.skey, req.ekey);
25,947✔
425
    STREAM_CHECK_CONDITION_GOTO(!uidInTableListSet(sStreamReaderInfo, req.suid, *uid, &id, false), TDB_CODE_SUCCESS);
25,947✔
426
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->deleteBlock, ((SSDataBlock*)rsp->deleteBlock)->info.rows + 1));
19,267✔
427
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->deleteBlock, id, req.skey, req.ekey, ver));
19,267✔
428
    ((SSDataBlock*)rsp->deleteBlock)->info.rows++;
19,267✔
429
    rsp->totalRows++;
19,267✔
430
  }
431

432
end:
29,172✔
433
  taosArrayDestroy(req.uidList);
29,172✔
434
  tDecoderClear(&decoder);
29,172✔
435
  return code;
29,172✔
436
}
437

438
static int32_t createBlockForProcessMeta(SSDataBlock** pBlock) {
1,511✔
439
  int32_t code = 0;
1,511✔
440
  int32_t lino = 0;
1,511✔
441
  SArray* schemas = NULL;
1,511✔
442

443
  schemas = taosArrayInit(8, sizeof(SSchema));
1,511✔
444
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,511✔
445

446
  int32_t index = 0;
1,511✔
447
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
1,511✔
448
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
1,511✔
449
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
1,511✔
450

451
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
1,511✔
452

453
end:
1,511✔
454
  taosArrayDestroy(schemas);
1,511✔
455
  return code;
1,511✔
456
}
457

458
static int32_t addOneRow(void** tmp, int64_t id, int64_t ver, ETableBlockType type) {
1,965✔
459
  int32_t  code = 0;
1,965✔
460
  int32_t  lino = 0;
1,965✔
461
  if (*tmp == NULL) {
1,965✔
462
    STREAM_CHECK_RET_GOTO(createBlockForProcessMeta((SSDataBlock**)tmp));
1,511✔
463
  }
464
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(*tmp, ((SSDataBlock*)(*tmp))->info.rows + 1));
1,965✔
465
  STREAM_CHECK_RET_GOTO(buildTableBlock(*tmp, id, ver, type));
1,965✔
466
  ((SSDataBlock*)(*tmp))->info.rows++;
1,965✔
467
  
468
end:
1,965✔
469
  return code;
1,965✔
470
}
471

472
static int32_t addUidListToBlock(SArray* uidListAdd, void** block, int64_t ver, int32_t* totalRows, ETableBlockType type) {
90,304✔
473
  for (int32_t i = 0; i < taosArrayGetSize(uidListAdd); ++i) {
92,269✔
474
    uint64_t* uid = taosArrayGet(uidListAdd, i);
1,965✔
475
    if (uid == NULL) {
1,965✔
476
      continue;
×
477
    }
478
    int32_t code = addOneRow(block, *uid, ver, type);
1,965✔
479
    if (code != 0) {
1,965✔
480
      return code;
×
481
    }
482
    (*totalRows)++;
1,965✔
483
  }
484
  return 0;
90,304✔
485
}
486

487
static int32_t qStreamGetAddTable(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListAdd, SArray* uidListAdd) {
23,422✔
488
  int32_t      code = 0;
23,422✔
489
  int32_t      lino = 0;
23,422✔
490
  if (uidListAdd == NULL) {
23,422✔
491
    return 0;
18,792✔
492
  }
493
  void* pTask = sStreamReaderInfo->pTask;
4,630✔
494
  
495
  taosRLockLatch(&sStreamReaderInfo->lock);
4,630✔
496
  int32_t totalSize = taosArrayGetSize(tableListAdd);
4,630✔
497
  for (int32_t i = 0; i < totalSize; ++i) {
5,285✔
498
    STableKeyInfo* info = taosArrayGet(tableListAdd, i);
655✔
499
    if (info == NULL) {
655✔
500
      continue;
×
501
    }
502
    if (taosHashGet(sStreamReaderInfo->tableList.uIdMap, &info->uid, LONG_BYTES) != NULL) {
655✔
503
      continue;
×
504
    }
505
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListAdd, &info->uid), terrno);
1,310✔
506
    ST_TASK_WLOG("%s real add table to list for uid:%" PRId64, __func__, info->uid);
655✔
507
  }
508

509
end:
4,630✔
510
  taosRUnLockLatch(&sStreamReaderInfo->lock);
4,630✔
511
  return code;
4,630✔
512
}
513

514
static int32_t qStreamGetDelTable(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListDel, SArray* uidListDel) {
33,214✔
515
  int32_t      code = 0;
33,214✔
516
  int32_t      lino = 0;
33,214✔
517
  if (uidListDel == NULL) {
33,214✔
518
    return 0;
×
519
  }
520
  void* pTask = sStreamReaderInfo->pTask;
33,214✔
521
  
522
  taosRLockLatch(&sStreamReaderInfo->lock);
33,214✔
523
  int32_t totalSize = taosArrayGetSize(tableListDel);
33,214✔
524
  for (int32_t i = 0; i < totalSize; ++i) {
34,070✔
525
    int64_t* uid = taosArrayGet(tableListDel, i);
856✔
526
    if (uid == NULL) {
856✔
527
      continue;
×
528
    }
529
    if (taosHashGet(sStreamReaderInfo->tableList.uIdMap, uid, LONG_BYTES) == NULL) {
856✔
UNCOV
530
      continue;
×
531
    }
532
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListDel, uid), terrno);
856✔
533
    ST_TASK_WLOG("%s real del table from list for uid:%" PRId64, __func__, *uid);
856✔
534
  }
535

536
end:
33,214✔
537
  taosRUnLockLatch(&sStreamReaderInfo->lock);
33,214✔
538
  return code;
33,214✔
539
}
540

541
static int32_t scanDropTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
33,214✔
542
                             int64_t ver) {
543
  int32_t  code = 0;
33,214✔
544
  int32_t  lino = 0;
33,214✔
545
  SDecoder decoder = {0};
33,214✔
546
  void* pTask = sStreamReaderInfo->pTask;
33,214✔
547
  SArray* uidList = NULL;
33,214✔
548
  SArray* uidListDel = NULL;
33,214✔
549
  SArray* uidListDelOutTbl = NULL;
33,214✔
550
  SVDropTbBatchReq req = {0};
33,214✔
551
  tDecoderInit(&decoder, data, len);
33,214✔
552
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
33,214✔
553

554
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
66,428✔
555
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
33,214✔
556
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
33,214✔
557
    uint64_t id = 0;
33,214✔
558
    if(!uidInTableListOrigin(sStreamReaderInfo, pDropTbReq->suid, pDropTbReq->uid, &id)) {
33,214✔
559
      continue;
32,358✔
560
    }
561

562
    if (sStreamReaderInfo->deleteOutTbl != 0) {
856✔
UNCOV
563
      if (uidListDelOutTbl == NULL) {
×
UNCOV
564
        uidListDelOutTbl = taosArrayInit(8, sizeof(tb_uid_t));
×
UNCOV
565
        STREAM_CHECK_NULL_GOTO(uidListDelOutTbl, terrno);
×
566
      }
UNCOV
567
      STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListDelOutTbl, &pDropTbReq->uid), terrno);
×
568
    }
569
    if (sStreamReaderInfo->isVtableStream) {
856✔
570
      if (uidList == NULL) {
856✔
571
        uidList = taosArrayInit(8, sizeof(tb_uid_t));
856✔
572
        STREAM_CHECK_NULL_GOTO(uidList, terrno);
856✔
573
      }
574
      STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &pDropTbReq->uid), terrno);
1,712✔
575
    }
576
    
577
    ST_TASK_DLOG("stream reader scan drop uid %" PRId64 ", id %" PRIu64, pDropTbReq->uid, id);
856✔
578
  }
579
  STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListDelOutTbl, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_DROP));
33,214✔
580

581
  if (sStreamReaderInfo->isVtableStream) {
33,214✔
582
    uidListDel = taosArrayInit(8, sizeof(tb_uid_t));
33,214✔
583
    STREAM_CHECK_NULL_GOTO(uidListDel, terrno);
33,214✔
584
    STREAM_CHECK_RET_GOTO(qStreamGetDelTable(sStreamReaderInfo, uidList, uidListDel));
33,214✔
585
    STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListDel, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_RETIRE));
33,214✔
586
  }
587
  
588
end:
33,214✔
589
  taosArrayDestroy(uidList);
33,214✔
590
  taosArrayDestroy(uidListDel);
33,214✔
591
  taosArrayDestroy(uidListDelOutTbl);
33,214✔
592
  tDecoderClear(&decoder);
33,214✔
593
  return code;
33,214✔
594
}
595

596
static int32_t qStreamModifyTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListAdd, SArray* tableListDel) {
24,796✔
597
  int32_t      code = 0;
24,796✔
598
  int32_t      lino = 0;
24,796✔
599
  void* pTask = sStreamReaderInfo->pTask;
24,796✔
600
  
601
  taosWLockLatch(&sStreamReaderInfo->lock);
24,796✔
602
  int32_t totalSize = taosArrayGetSize(tableListDel);
24,796✔
603
  for (int32_t i = 0; i < totalSize; ++i) {
24,796✔
UNCOV
604
    int64_t* uid = taosArrayGet(tableListDel, i);
×
UNCOV
605
    if (uid == NULL) {
×
606
      continue;
×
607
    }
UNCOV
608
    STREAM_CHECK_RET_GOTO(qStreamRemoveTableList(&sStreamReaderInfo->tableList, *uid));
×
609
  }
610

611
  totalSize = taosArrayGetSize(tableListAdd);
24,796✔
612
  for (int32_t i = 0; i < totalSize; ++i) {
34,110✔
613
    STableKeyInfo* info = taosArrayGet(tableListAdd, i);
9,314✔
614
    if (info == NULL) {
9,314✔
615
      continue;
×
616
    }
617
    int ret = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheTrigger, sStreamReaderInfo->pExprInfoTriggerTag, sStreamReaderInfo->numOfExprTriggerTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
9,314✔
618
    if (ret != 0){
9,314✔
619
      ST_TASK_WLOG("%s cacheTag trigger failed for uid:%" PRId64",code:%d", __func__, info->uid, ret);
4,630✔
620
      continue;
4,630✔
621
    }
622
    ret = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheCalc, sStreamReaderInfo->pExprInfoCalcTag, sStreamReaderInfo->numOfExprCalcTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
4,684✔
623
    if (ret != 0){
4,684✔
624
      ST_TASK_WLOG("%s cacheTag calc failed for uid:%" PRId64",code:%d", __func__, info->uid, ret);
×
625
      continue;
×
626
    }
627
    STREAM_CHECK_RET_GOTO(qStreamRemoveTableList(&sStreamReaderInfo->tableList, info->uid));
4,684✔
628
    STREAM_CHECK_RET_GOTO(qStreamSetTableList(&sStreamReaderInfo->tableList, info->uid, info->groupId));
4,684✔
629
  }
630

631
end:
24,796✔
632
  taosWUnLockLatch(&sStreamReaderInfo->lock);
24,796✔
633
  return code;
24,796✔
634
}
635

636
static int32_t processTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* uidList, SArray** tableList) {
24,796✔
637
  int32_t code = 0;
24,796✔
638
  int32_t lino = 0;
24,796✔
639
  SNodeList* groupNew = NULL;   
24,796✔
640

641
  if (taosArrayGetSize(uidList) == 0) {
24,796✔
642
    return 0;
15,482✔
643
  }
644
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));  
9,314✔
645
  STREAM_CHECK_RET_GOTO(qStreamFilterTableListForReader(sStreamReaderInfo->pVnode, uidList, groupNew, sStreamReaderInfo->pTagCond,
9,314✔
646
                                                    sStreamReaderInfo->pTagIndexCond, &sStreamReaderInfo->storageApi,
647
                                                    sStreamReaderInfo->groupIdMap, sStreamReaderInfo->suid, tableList));
648

649
end:
9,314✔
650
  nodesDestroyList(groupNew);
9,314✔
651
  return code;
9,314✔
652
}
653

654
static int32_t scanCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
23,422✔
655
                             int64_t ver) {
656
  int32_t  code = 0;
23,422✔
657
  int32_t  lino = 0;
23,422✔
658
  SDecoder decoder = {0};
23,422✔
659
  SArray*  uidList = NULL;
23,422✔
660
  SArray*  tableList = NULL;
23,422✔
661
  SArray*  uidListAdd = NULL;
23,422✔
662
  void* pTask = sStreamReaderInfo->pTask;
23,422✔
663

664
  SVCreateTbBatchReq req = {0};
23,422✔
665
  tDecoderInit(&decoder, data, len);
23,422✔
666
  
667
  STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbBatchReq(&decoder, &req));
23,422✔
668

669
  uidList = taosArrayInit(8, sizeof(tb_uid_t));
23,422✔
670
  STREAM_CHECK_NULL_GOTO(uidList, terrno);
23,422✔
671

672
  if (sStreamReaderInfo->isVtableStream) {
23,422✔
673
    uidListAdd = taosArrayInit(8, sizeof(tb_uid_t));
4,630✔
674
    STREAM_CHECK_NULL_GOTO(uidListAdd, terrno);
4,630✔
675
  }
676
  
677
  SVCreateTbReq* pCreateReq = NULL;
23,192✔
678
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
46,614✔
679
    pCreateReq = req.pReqs + iReq;
23,422✔
680
    if (!needReLoadTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
23,422✔
681
      ST_TASK_DLOG("stream reader scan create table jump, %s", pCreateReq->name);
15,482✔
682
      continue;
15,482✔
683
    }
684
    ST_TASK_ILOG("stream reader scan create table %s", pCreateReq->name);
7,940✔
685
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &pCreateReq->uid), terrno);
15,880✔
686
  }
687
  
688
  STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo, uidList, &tableList));
23,192✔
689
  STREAM_CHECK_RET_GOTO(qStreamGetAddTable(sStreamReaderInfo, tableList, uidListAdd));
23,422✔
690
  STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
23,422✔
691

692
  STREAM_CHECK_RET_GOTO(qStreamModifyTableList(sStreamReaderInfo, tableList, uidList));
23,422✔
693
end:
23,422✔
694
  taosArrayDestroy(uidList);
23,422✔
695
  taosArrayDestroy(uidListAdd);
23,422✔
696
  taosArrayDestroy(tableList);
23,422✔
697
  tDeleteSVCreateTbBatchReq(&req);
23,422✔
698
  tDecoderClear(&decoder);
23,422✔
699
  return code;
23,422✔
700
}
701

702
static int32_t processAutoCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SVCreateTbReq* pCreateReq, int64_t ver) {
5,114,413✔
703
  int32_t  code = 0;
5,114,413✔
704
  int32_t  lino = 0;
5,114,413✔
705
  void*    pTask = sStreamReaderInfo->pTask;
5,114,413✔
706
  SArray*  uidList = NULL;
5,114,313✔
707
  SArray*  tableList = NULL;
5,114,313✔
708

709
  ST_TASK_DLOG("%s start, name:%s uid:%"PRId64, __func__, pCreateReq->name, pCreateReq->uid);
5,114,610✔
710
  if (!needReLoadTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false) ||
5,117,477✔
711
      ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
1,884✔
712
    ST_TASK_DLOG("stream reader scan auto create table jump, %s", pCreateReq->name);
5,113,245✔
713
    goto end;
5,112,983✔
714
  }
715
  uidList = taosArrayInit(8, sizeof(tb_uid_t));
1,374✔
716
  STREAM_CHECK_NULL_GOTO(uidList, terrno);
1,374✔
717
  STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &pCreateReq->uid), terrno);
2,748✔
718
  ST_TASK_DLOG("stream reader scan auto create table %s", pCreateReq->name);
1,374✔
719

720
  STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo, uidList, &tableList));
1,374✔
721
  STREAM_CHECK_RET_GOTO(qStreamModifyTableList(sStreamReaderInfo, tableList, uidList));
1,374✔
722
end:
5,111,969✔
723
  taosArrayDestroy(uidList);
5,113,897✔
724
  taosArrayDestroy(tableList);
5,114,130✔
725
  return code;
5,114,579✔
726
}
727

728
static bool isColIdInList(SNodeList* colList, col_id_t cid){
454✔
729
  int32_t  code = 0;
454✔
730
  int32_t  lino = 0;
454✔
731
  SNode*  nodeItem = NULL;
454✔
732
  FOREACH(nodeItem, colList) {
1,135✔
733
    SNode*           pNode = ((STargetNode*)nodeItem)->pExpr;
1,135✔
734
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
1,135✔
735
      SColumnNode*     valueNode = (SColumnNode*)(pNode);
1,135✔
736
      if (cid == valueNode->colId) {
1,135✔
737
        return true;
454✔
738
      }
739
    }
740
  }
741
end:
×
742
  return false;
×
743
}
744

745
static bool isAlteredTable(ETableType tbType) {
4,969✔
746
  return tbType == TSDB_CHILD_TABLE || tbType == TSDB_VIRTUAL_CHILD_TABLE || tbType == TSDB_VIRTUAL_NORMAL_TABLE;
4,969✔
747
}
748

749
void getAlterColId(void* pVnode, int64_t uid, const char* colName, col_id_t* colId) {
454✔
750
  SSchemaWrapper *pSchema = metaGetTableSchema(((SVnode *)pVnode)->pMeta, uid, -1, 1, NULL, 0);
454✔
751
  if (pSchema == NULL) {
454✔
752
    return;
×
753
  }
754
  for (int32_t i = 0; i < pSchema->nCols; i++) {
1,135✔
755
    if (strncmp(pSchema->pSchema[i].name, colName, TSDB_COL_NAME_LEN) == 0) {
1,135✔
756
      *colId = pSchema->pSchema[i].colId;
454✔
757
      break;
454✔
758
    }
759
  }
760
  tDeleteSchemaWrapper(pSchema);
761
  return;
454✔
762
}
763

764
static bool checkAlterCondition() {
×
765
  return true;
×
766
}
767

768
static int32_t scanAlterTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len, int64_t ver) {
34,616✔
769
  int32_t  code = 0;
34,616✔
770
  int32_t  lino = 0;
34,616✔
771
  SDecoder decoder = {0};
34,616✔
772
  void* pTask = sStreamReaderInfo->pTask;
34,616✔
773
  SArray*  uidList = NULL;
34,616✔
774
  SArray*  uidListAdd = NULL;
34,616✔
775
  SArray*  uidListDel = NULL;
34,616✔
776
  SArray*  tableList = NULL;
34,616✔
777

778
  ST_TASK_DLOG("%s start", __func__);
34,616✔
779

780
  SVAlterTbReq req = {0};
34,616✔
781
  tDecoderInit(&decoder, data, len);
34,616✔
782
  
783
  STREAM_CHECK_RET_GOTO(tDecodeSVAlterTbReq(&decoder, &req));
34,616✔
784

785

786
  // TODO:
787
  // 1. TSDB_ALTER_TABLE_UPDATE_TAG_VAL and TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL is not used any more. 
788
  // 2. TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL and TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL are
789
  //    added, both support updating tag value for multiple tables.
790

791

792
  STREAM_CHECK_CONDITION_GOTO(req.action != TSDB_ALTER_TABLE_UPDATE_TAG_VAL && req.action != TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL && 
34,616✔
793
    req.action != TSDB_ALTER_TABLE_ALTER_COLUMN_REF && req.action != TSDB_ALTER_TABLE_REMOVE_COLUMN_REF, TDB_CODE_SUCCESS);
794

795
  ETableType tbType = 0;
4,969✔
796
  uint64_t suid = 0;
4,969✔
797
  code = metaGetTableTypeSuidByName(sStreamReaderInfo->pVnode, req.tbName, &tbType, &suid);
4,969✔
798
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
4,969✔
UNCOV
799
    code = 0;
×
UNCOV
800
    ST_TASK_WLOG("stream reader scan alter table %s not exist, metaGetTableTypeSuidByName", req.tbName);
×
UNCOV
801
    goto end;
×
802
  }
803
  STREAM_CHECK_CONDITION_GOTO(!isAlteredTable(tbType), TDB_CODE_SUCCESS);
4,969✔
804
  STREAM_CHECK_CONDITION_GOTO(suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
4,969✔
805

806
  if (sStreamReaderInfo->isVtableStream) {
1,959✔
807
    uidListAdd = taosArrayInit(8, sizeof(tb_uid_t));
454✔
808
    STREAM_CHECK_NULL_GOTO(uidListAdd, terrno);
454✔
809
  }
810
  uint64_t uid = 0;
1,959✔
811
  code = metaGetTableUidByName(sStreamReaderInfo->pVnode, req.tbName, &uid);
1,959✔
812
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
1,959✔
UNCOV
813
    code = 0;
×
UNCOV
814
    ST_TASK_WLOG("stream reader scan alter table %s not exist, metaGetTableUidByName", req.tbName);
×
815
    goto end;
×
816
  }
817
  if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF || req.action == TSDB_ALTER_TABLE_REMOVE_COLUMN_REF) {
2,413✔
818
    uint64_t id = 0;
1,959✔
819
    STREAM_CHECK_CONDITION_GOTO(!uidInTableListOrigin(sStreamReaderInfo, suid, uid, &id), TDB_CODE_SUCCESS);
1,959✔
820
    col_id_t colId = 0;
454✔
821
    getAlterColId(sStreamReaderInfo->pVnode, uid, req.colName, &colId);
454✔
822
    if (atomic_load_8(&sStreamReaderInfo->isVtableOnlyTs) == 0 && !isColIdInList(sStreamReaderInfo->triggerCols, colId)) {    //todo calc cols
454✔
UNCOV
823
      ST_TASK_ILOG("stream reader scan alter table %s, colId %d not in trigger cols", req.tbName, colId);
×
UNCOV
824
      goto end;
×
825
    }
826
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListAdd, &uid), terrno);
454✔
827
    STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
454✔
828
  } else {
UNCOV
829
    uidList = taosArrayInit(8, sizeof(tb_uid_t));
×
830
    STREAM_CHECK_NULL_GOTO(uidList, terrno);
×
831
    
832
    uidListDel = taosArrayInit(8, sizeof(tb_uid_t));
×
833
    STREAM_CHECK_NULL_GOTO(uidListDel, terrno);
×
UNCOV
834
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &uid), terrno);
×
UNCOV
835
    STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo, uidList, &tableList));
×
UNCOV
836
    STREAM_CHECK_RET_GOTO(qStreamGetDelTable(sStreamReaderInfo, uidList, uidListDel));
×
837
    if (rsp->checkAlter && taosArrayGetSize(uidListDel) > 0 && rsp->totalDataRows > 0) {
×
838
      rsp->needReturn = true;
×
UNCOV
839
      rsp->ver--;
×
UNCOV
840
      ST_TASK_DLOG("stream reader scan alter table %s need return data again for uid %" PRId64, req.tbName, uid);
×
UNCOV
841
      goto end;
×
842
    }
UNCOV
843
    STREAM_CHECK_RET_GOTO(qStreamGetAddTable(sStreamReaderInfo, tableList, uidListAdd));
×
UNCOV
844
    if (sStreamReaderInfo->isVtableStream) {
×
UNCOV
845
      STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
×
UNCOV
846
      STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListDel, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_RETIRE));
×
847
    }
UNCOV
848
    STREAM_CHECK_RET_GOTO(qStreamModifyTableList(sStreamReaderInfo, tableList, uidList));
×
849
  }
850
  ST_TASK_DLOG("stream reader scan alter table %s", req.tbName);
454✔
851

852
end:
34,616✔
853
  taosArrayDestroy(uidList);
34,616✔
854
  taosArrayDestroy(uidListAdd);
34,616✔
855
  taosArrayDestroy(uidListDel);
34,616✔
856
  taosArrayDestroy(tableList);
34,616✔
857
  taosArrayDestroy(req.pMultiTag);
34,616✔
858
  for (int32_t i = 0; i < taosArrayGetSize(req.tables); i++) {
54,729✔
859
    SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
20,113✔
860
    taosArrayDestroy(pTable->tags);
20,113✔
861
  }
862
  taosArrayDestroy(req.tables);
34,616✔
863
  tDecoderClear(&decoder);
34,616✔
864
  STREAM_PRINT_LOG_END_WITHID(code, lino);
34,616✔
865
  return code;
34,616✔
866
}
867

868
// static int32_t scanAlterSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
869
//   int32_t  code = 0;
870
//   int32_t  lino = 0;
871
//   SDecoder decoder = {0};
872
//   SMAlterStbReq reqAlter = {0};
873
//   SVCreateStbReq req = {0};
874
//   tDecoderInit(&decoder, data, len);
875
//   void* pTask = sStreamReaderInfo->pTask;
876
  
877
//   STREAM_CHECK_RET_GOTO(tDecodeSVCreateStbReq(&decoder, &req));
878
//   STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
879
//   if (req.alterOriData != 0) {
880
//     STREAM_CHECK_RET_GOTO(tDeserializeSMAlterStbReq(req.alterOriData, req.alterOriDataLen, &reqAlter));
881
//     STREAM_CHECK_CONDITION_GOTO(reqAlter.alterType != TSDB_ALTER_TABLE_DROP_TAG && reqAlter.alterType != TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TDB_CODE_SUCCESS);
882
//   }
883
  
884
//   STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo));
885

886
//   ST_TASK_DLOG("stream reader scan alter suid %" PRId64, req.suid);
887
// end:
888
//   tFreeSMAltertbReq(&reqAlter);
889
//   tDecoderClear(&decoder);
890
//   return code;
891
// }
892

893
// static int32_t scanDropSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
894
//   int32_t  code = 0;
895
//   int32_t  lino = 0;
896
//   SDecoder decoder = {0};
897
//   void* pTask = sStreamReaderInfo->pTask;
898

899
//   SVDropStbReq req = {0};
900
//   tDecoderInit(&decoder, data, len);
901
//   STREAM_CHECK_RET_GOTO(tDecodeSVDropStbReq(&decoder, &req));
902
//   STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
903

904
//   ST_TASK_DLOG("stream reader scan drop suid %" PRId64, req.suid);
905
// end:
906
//   tDecoderClear(&decoder);
907
//   return code;
908
// }
909

910
static int32_t scanSubmitTbDataForMeta(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* gidHash, int64_t ver) {
40,604,771✔
911
  int32_t code = 0;
40,604,771✔
912
  int32_t lino = 0;
40,604,771✔
913
  WalMetaResult walMeta = {0};
40,604,771✔
914
  SSubmitTbData submitTbData = {0};
40,605,655✔
915
  
916
  if (tStartDecode(pCoder) < 0) {
40,605,064✔
UNCOV
917
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
918
    TSDB_CHECK_CODE(code, lino, end);
×
919
  }
920

921
  uint8_t       version = 0;
40,605,128✔
922
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
40,605,634✔
UNCOV
923
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
924
    TSDB_CHECK_CODE(code, lino, end);
×
925
  }
926
  version = (submitTbData.flags >> 8) & 0xff;
40,605,634✔
927
  submitTbData.flags = submitTbData.flags & 0xff;
40,605,634✔
928

929
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
930
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
40,605,634✔
931
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
4,328,218✔
932
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
4,328,218✔
933
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
4,328,218✔
934
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq, ver));
4,328,372✔
935
  }
936

937
  // submit data
938
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
40,604,565✔
939
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
940
    TSDB_CHECK_CODE(code, lino, end);
×
941
  }
942
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
40,604,116✔
943
    code = TSDB_CODE_INVALID_MSG;
×
944
    TSDB_CHECK_CODE(code, lino, end);
×
945
  }
946

947
  if (!uidInTableListSet(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &walMeta.id, false)){
40,604,116✔
948
    goto end;
32,700,769✔
949
  }
950
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
7,903,925✔
951
    code = TSDB_CODE_INVALID_MSG;
×
952
    TSDB_CHECK_CODE(code, lino, end);
×
953
  }
954

955
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
7,903,925✔
956
    uint64_t nColData = 0;
×
957
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
958
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
959
      TSDB_CHECK_CODE(code, lino, end);
×
960
    }
961

UNCOV
962
    SColData colData = {0};
×
963
    code = tDecodeColData(version, pCoder, &colData, false);
×
964
    if (code) {
×
965
      code = TSDB_CODE_INVALID_MSG;
×
966
      TSDB_CHECK_CODE(code, lino, end);
×
967
    }
968

UNCOV
969
    if (colData.flag != HAS_VALUE) {
×
UNCOV
970
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
971
      TSDB_CHECK_CODE(code, lino, end);
×
972
    }
973
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
974
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
975

UNCOV
976
    for (uint64_t i = 1; i < nColData; i++) {
×
UNCOV
977
      code = tDecodeColData(version, pCoder, &colData, true);
×
UNCOV
978
      if (code) {
×
UNCOV
979
        code = TSDB_CODE_INVALID_MSG;
×
UNCOV
980
        TSDB_CHECK_CODE(code, lino, end);
×
981
      }
982
    }
983
  } else {
984
    uint64_t nRow = 0;
7,903,925✔
985
    if (tDecodeU64v(pCoder, &nRow) < 0) {
7,903,925✔
UNCOV
986
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
987
      TSDB_CHECK_CODE(code, lino, end);
×
988
    }
989

990
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
16,859,319✔
991
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
8,955,169✔
992
      pCoder->pos += pRow->len;
8,955,394✔
993
      if (iRow == 0){
8,955,413✔
994
#ifndef NO_UNALIGNED_ACCESS
995
        walMeta.skey = pRow->ts;
7,903,738✔
996
#else
997
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
998
#endif
999
      }
1000
      if (iRow == nRow - 1) {
8,955,619✔
1001
#ifndef NO_UNALIGNED_ACCESS
1002
        walMeta.ekey = pRow->ts;
7,903,944✔
1003
#else
1004
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
1005
#endif
1006
      }
1007
    }
1008
  }
1009

1010
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
7,900,250✔
1011
  if (data != NULL) {
7,903,494✔
1012
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
924✔
1013
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
924✔
1014
  } else {
1015
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
7,902,570✔
1016
  }
1017

1018
end:
40,600,693✔
1019
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
40,600,636✔
1020
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
40,601,614✔
1021
  tEndDecode(pCoder);
40,600,864✔
1022
  return code;
40,603,718✔
1023
}
1024

1025
static int32_t scanSubmitDataForMeta(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len, int64_t ver) {
40,603,604✔
1026
  int32_t  code = 0;
40,603,604✔
1027
  int32_t  lino = 0;
40,603,604✔
1028
  SDecoder decoder = {0};
40,603,604✔
1029
  SSHashObj* gidHash = NULL;
40,603,829✔
1030
  void* pTask = sStreamReaderInfo->pTask;
40,603,829✔
1031

1032
  tDecoderInit(&decoder, data, len);
40,603,604✔
1033
  if (tStartDecode(&decoder) < 0) {
40,602,005✔
UNCOV
1034
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1035
    TSDB_CHECK_CODE(code, lino, end);
×
1036
  }
1037

1038
  uint64_t nSubmitTbData = 0;
40,601,575✔
1039
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
40,604,054✔
UNCOV
1040
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1041
    TSDB_CHECK_CODE(code, lino, end);
×
1042
  }
1043

1044
  gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
40,604,054✔
1045
  STREAM_CHECK_NULL_GOTO(gidHash, terrno);
40,597,137✔
1046

1047
  for (int32_t i = 0; i < nSubmitTbData; i++) {
81,200,387✔
1048
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataForMeta(&decoder, sStreamReaderInfo, gidHash, ver));
40,602,815✔
1049
  }
1050
  tEndDecode(&decoder);
40,597,572✔
1051

1052
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
40,604,746✔
1053
  int32_t iter = 0;
40,602,453✔
1054
  void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
40,602,024✔
1055
  while (px != NULL) {
48,503,100✔
1056
    WalMetaResult* pMeta = (WalMetaResult*)px;
7,901,875✔
1057
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
7,901,875✔
1058
    ((SSDataBlock*)rsp->metaBlock)->info.rows++;
7,903,451✔
1059
    rsp->totalRows++;
7,902,329✔
1060
    ST_TASK_DLOG("stream reader scan submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
7,903,226✔
1061
          ", ver:%"PRId64, pMeta->skey, pMeta->ekey, pMeta->id, ver);
1062
    px = tSimpleHashIterate(gidHash, px, &iter);
7,903,448✔
1063
  }
1064
end:
40,601,225✔
1065
  tDecoderClear(&decoder);
40,603,071✔
1066
  tSimpleHashCleanup( gidHash);
40,604,710✔
1067
  return code;
40,601,137✔
1068
}
1069

1070
static int32_t createBlockForTsdbMeta(SSDataBlock** pBlock, bool isVTable) {
416,769✔
1071
  int32_t code = 0;
416,769✔
1072
  int32_t lino = 0;
416,769✔
1073
  SArray* schemas = taosArrayInit(8, sizeof(SSchema));
416,769✔
1074
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
416,769✔
1075

1076
  int32_t index = 1;
416,769✔
1077
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
416,769✔
1078
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
416,769✔
1079
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
416,769✔
1080
  if (!isVTable) {
416,769✔
1081
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
42,919✔
1082
  }
1083
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
416,769✔
1084

1085
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
416,769✔
1086

1087
end:
416,769✔
1088
  taosArrayDestroy(schemas);
416,769✔
1089
  return code;
416,769✔
1090
}
1091

1092
static int32_t createBlockForWalMetaNew(SSDataBlock** pBlock) {
286,039✔
1093
  int32_t code = 0;
286,039✔
1094
  int32_t lino = 0;
286,039✔
1095
  SArray* schemas = NULL;
286,039✔
1096

1097
  schemas = taosArrayInit(8, sizeof(SSchema));
286,039✔
1098
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
286,039✔
1099

1100
  int32_t index = 0;
286,039✔
1101
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
286,039✔
1102
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
286,039✔
1103
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
286,039✔
1104
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
286,039✔
1105

1106
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
286,039✔
1107

1108
end:
286,039✔
1109
  taosArrayDestroy(schemas);
286,039✔
1110
  return code;
286,039✔
1111
}
1112

1113
static int32_t processMeta(int16_t msgType, SStreamTriggerReaderInfo* sStreamReaderInfo, void *data, int32_t len, SSTriggerWalNewRsp* rsp, int64_t ver) {
488,812✔
1114
  int32_t code = 0;
488,812✔
1115
  int32_t lino = 0;
488,812✔
1116
  void* pTask = sStreamReaderInfo->pTask;
488,812✔
1117

1118
  ST_TASK_DLOG("%s check meta msg, stream ver:%" PRId64 ", wal ver:%" PRId64, __func__, sStreamReaderInfo->tableList.version, ver);
488,812✔
1119

1120
  SDecoder dcoder = {0};
488,812✔
1121
  tDecoderInit(&dcoder, data, len);
488,812✔
1122
  if (msgType == TDMT_VND_DELETE && sStreamReaderInfo->deleteReCalc != 0) {
488,812✔
1123
    if (rsp->deleteBlock == NULL) {
29,172✔
1124
      STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&rsp->deleteBlock));
10,430✔
1125
    }
1126
      
1127
    STREAM_CHECK_RET_GOTO(scanDeleteDataNew(sStreamReaderInfo, rsp, data, len, ver));
29,172✔
1128
  } else if (msgType == TDMT_VND_DROP_TABLE && 
459,640✔
1129
    (sStreamReaderInfo->deleteOutTbl != 0 || sStreamReaderInfo->isVtableStream)) {
49,247✔
1130
    STREAM_CHECK_RET_GOTO(scanDropTableNew(sStreamReaderInfo, rsp, data, len, ver));
33,214✔
1131
  // } else if (msgType == TDMT_VND_DROP_STB) {
1132
  //   STREAM_CHECK_RET_GOTO(scanDropSTableNew(sStreamReaderInfo, data, len));
1133
  } else if (msgType == TDMT_VND_CREATE_TABLE && !ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
426,426✔
1134
    STREAM_CHECK_RET_GOTO(scanCreateTableNew(sStreamReaderInfo, rsp, data, len, ver));
23,422✔
1135
  } else if (msgType == TDMT_VND_ALTER_STB && !ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
403,004✔
1136
    // STREAM_CHECK_RET_GOTO(scanAlterSTableNew(sStreamReaderInfo, data, len));
1137
  } else if (msgType == TDMT_VND_ALTER_TABLE && !ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
378,183✔
1138
    STREAM_CHECK_RET_GOTO(scanAlterTableNew(sStreamReaderInfo, rsp, data, len, ver));
34,616✔
1139
  }
1140

1141
end:
488,812✔
1142
  tDecoderClear(&dcoder);
488,812✔
1143
  return code;
488,812✔
1144
}
1145
static int32_t processWalVerMetaNew(SVnode* pVnode, SSTriggerWalNewRsp* rsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
12,208,650✔
1146
                       int64_t ctime) {
1147
  int32_t code = 0;
12,208,650✔
1148
  int32_t lino = 0;
12,208,650✔
1149
  void* pTask = sStreamReaderInfo->pTask;
12,208,650✔
1150

1151
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
12,209,400✔
1152
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
12,208,989✔
1153
  code = walReaderSeekVer(pWalReader, rsp->ver);
12,208,989✔
1154
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
12,207,581✔
1155
    if (rsp->ver < walGetFirstVer(pWalReader->pWal)) {
6,920,844✔
UNCOV
1156
      rsp->ver = walGetFirstVer(pWalReader->pWal);
×
UNCOV
1157
      rsp->verTime = 0;
×
1158
    } else {
1159
      rsp->verTime = taosGetTimestampUs();
6,921,161✔
1160
    }
1161
    ST_TASK_DLOG("vgId:%d %s scan wal end:%s", TD_VID(pVnode), __func__, tstrerror(code));
6,920,995✔
1162
    code = TSDB_CODE_SUCCESS;
6,922,209✔
1163
    goto end;
6,922,209✔
1164
  }
1165
  STREAM_CHECK_RET_GOTO(code);
5,286,737✔
1166

1167
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, STREAM_RETURN_ROWS_NUM));
5,286,737✔
1168
  while (1) {
40,983,273✔
1169
    code = walNextValidMsg(pWalReader, true);
46,269,375✔
1170
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
46,265,597✔
1171
      rsp->verTime = taosGetTimestampUs();
5,286,962✔
1172
      ST_TASK_DLOG("vgId:%d %s scan wal end:%s", TD_VID(pVnode), __func__, tstrerror(code));
5,286,753✔
1173
      code = TSDB_CODE_SUCCESS;
5,286,962✔
1174
      goto end;
5,286,962✔
1175
    }
1176
    STREAM_CHECK_RET_GOTO(code);
40,983,174✔
1177
    rsp->ver = pWalReader->curVersion;
40,983,174✔
1178
    SWalCont* wCont = &pWalReader->pHead->head;
40,985,082✔
1179
    rsp->verTime = wCont->ingestTs;
40,985,729✔
1180
    if (wCont->ingestTs / 1000 > ctime) break;
40,982,826✔
1181
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
40,984,873✔
1182
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
40,982,665✔
1183
    int64_t ver = wCont->version;
40,982,924✔
1184

1185
    ST_TASK_DLOG("vgId:%d stream reader scan wal ver:%" PRId64 "/%" PRId64 ", type:%s, deleteData:%d, deleteTb:%d",
40,983,074✔
1186
      TD_VID(pVnode), ver, walGetAppliedVer(pWalReader->pWal), TMSG_INFO(wCont->msgType), sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
1187
    if (wCont->msgType == TDMT_VND_SUBMIT) {
40,985,458✔
1188
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
40,604,070✔
1189
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
40,604,745✔
1190
      STREAM_CHECK_RET_GOTO(scanSubmitDataForMeta(sStreamReaderInfo, rsp, data, len, ver));
40,604,954✔
1191
    } else {
1192
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, rsp, ver));
381,479✔
1193
    }
1194

1195
    if (rsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
40,984,511✔
UNCOV
1196
      break;
×
1197
    }
1198
  }
1199

1200
end:
12,209,171✔
1201
  walCloseReader(pWalReader);
12,209,171✔
1202
  return code;
12,208,743✔
1203
}
1204

1205
int32_t cacheTag(SVnode* pVnode, SHashObj* metaCache, SExprInfo* pExprInfo, int32_t numOfExpr, SStorageAPI* api, uint64_t uid, col_id_t colId, SRWLatch* lock) {
39,367,675✔
1206
  int32_t     code = 0;
39,367,675✔
1207
  int32_t     lino = 0;
39,367,675✔
1208
  SMetaReader mr = {0};
39,367,675✔
1209
  SArray* tagCache = NULL;
39,375,259✔
1210
  char* data = NULL;
39,384,184✔
1211

1212
  if (lock != NULL) taosWLockLatch(lock);
39,385,013✔
1213
  STREAM_CHECK_CONDITION_GOTO(numOfExpr == 0, code);
39,405,472✔
1214
  stDebug("%s start,uid:%"PRIu64, __func__, uid);
2,164,035✔
1215
  void* uidData = taosHashGet(metaCache, &uid, LONG_BYTES);
2,164,035✔
1216
  if (uidData == NULL) {
2,164,756✔
1217
    tagCache = taosArrayInit(numOfExpr, POINTER_BYTES);
2,164,089✔
1218
    STREAM_CHECK_NULL_GOTO(tagCache, terrno);
2,164,089✔
1219
    if(taosHashPut(metaCache, &uid, LONG_BYTES, &tagCache, POINTER_BYTES) != 0) {
2,164,089✔
UNCOV
1220
      taosArrayDestroy(tagCache);
×
UNCOV
1221
      code = terrno;
×
UNCOV
1222
      goto end;
×
1223
    }
1224
  } else {
1225
    tagCache = *(SArray**)uidData;
667✔
1226
    stDebug("%s found tagCache, size:%zu %d, uid:%"PRIu64, __func__, taosArrayGetSize(tagCache), numOfExpr, uid);
667✔
1227
    STREAM_CHECK_CONDITION_GOTO(taosArrayGetSize(tagCache) != numOfExpr, TSDB_CODE_INVALID_PARA);
667✔
1228
  }
1229
  
1230
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
2,164,756✔
1231
  code = api->metaReaderFn.getEntryGetUidCache(&mr, uid);
2,164,756✔
1232
  api->metaReaderFn.readerReleaseLock(&mr);
2,163,744✔
1233
  STREAM_CHECK_RET_GOTO(code);
2,164,389✔
1234
  
1235
  for (int32_t j = 0; j < numOfExpr; ++j) {
7,547,011✔
1236
    const SExprInfo* pExpr1 = &pExprInfo[j];
5,386,557✔
1237
    int32_t functionId = pExpr1->pExpr->_function.functionId;
5,387,367✔
1238
    col_id_t cid = 0;
5,389,552✔
1239
    // this is to handle the tbname
1240
    if (fmIsScanPseudoColumnFunc(functionId)) {
5,389,552✔
1241
      int32_t fType = pExpr1->pExpr->_function.functionType;
562,242✔
1242
      if (fType == FUNCTION_TYPE_TBNAME) {
562,612✔
1243
        data = taosMemoryCalloc(1, strlen(mr.me.name) + VARSTR_HEADER_SIZE);
562,612✔
1244
        STREAM_CHECK_NULL_GOTO(data, terrno);
562,612✔
1245
        STR_TO_VARSTR(data, mr.me.name)
562,612✔
1246
      }
1247
      cid = -1;
562,612✔
1248
    } else {  // these are tags
1249
      const char* p = NULL;
4,824,977✔
1250
      char* pData = NULL;
4,824,977✔
1251
      int8_t type = pExpr1->base.resSchema.type;
4,824,977✔
1252
      int32_t len = pExpr1->base.resSchema.bytes;
4,824,977✔
1253
      STagVal tagVal = {0};
4,825,622✔
1254
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
4,825,622✔
1255
      cid = tagVal.cid;
4,826,573✔
1256
      if (colId != 0 && cid != colId) {
4,826,573✔
1257
        continue;
999✔
1258
      }
1259
      p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, type, &tagVal);
4,825,574✔
1260

1261
      if (type != TSDB_DATA_TYPE_JSON && p != NULL) {
4,822,655✔
1262
        pData = tTagValToData((const STagVal*)p, false);
4,820,030✔
1263
      } else {
1264
        pData = (char*)p;
2,625✔
1265
      }
1266

1267
      if (pData != NULL && (type == TSDB_DATA_TYPE_JSON || !IS_VAR_DATA_TYPE(type))) {
4,823,022✔
1268
        if (type == TSDB_DATA_TYPE_JSON) {
2,397,760✔
UNCOV
1269
          len = getJsonValueLen(pData);
×
1270
        }
1271
        data = taosMemoryCalloc(1, len);
2,397,760✔
1272
        STREAM_CHECK_NULL_GOTO(data, terrno);
2,397,849✔
1273
        (void)memcpy(data, pData, len);
2,397,849✔
1274
      } else {
1275
        data = pData;
2,425,262✔
1276
      }
1277
    }
1278
    if (uidData == NULL){
5,386,368✔
1279
      STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &data), terrno);
10,773,055✔
1280
    } else {
1281
      void* pre = taosArrayGetP(tagCache, j);
335✔
1282
      taosMemoryFree(pre);
335✔
1283
      taosArraySet(tagCache, j, &data);
335✔
1284
    }
1285
    data = NULL;
5,386,253✔
1286
  }
1287

1288
end:
39,406,402✔
1289
  taosMemoryFree(data);
39,400,607✔
1290
  api->metaReaderFn.clearReader(&mr);
39,362,407✔
1291
  if (lock != NULL) taosWUnLockLatch(lock);
39,368,693✔
1292
  return code;
39,386,817✔
1293
}
1294

1295
int32_t fillTag(SHashObj* metaCache, SExprInfo* pExprInfo, int32_t numOfExpr,
107,939,873✔
1296
                uint64_t uid, SSDataBlock* pBlock, uint32_t currentRow, uint32_t numOfRows, uint32_t numOfBlocks, SRWLatch* lock) {
1297
  int32_t     code = 0;
107,939,873✔
1298
  int32_t     lino = 0;
107,939,873✔
1299
  SArray* tagCache = NULL;
107,939,873✔
1300
  if (numOfExpr == 0) {
107,939,873✔
1301
    return TSDB_CODE_SUCCESS;
36,746,809✔
1302
  }
1303

1304
  taosRLockLatch(lock);
71,193,064✔
1305
  void* uidData = taosHashGet(metaCache, &uid, LONG_BYTES);
71,239,893✔
1306
  if (uidData == NULL) {
71,222,493✔
UNCOV
1307
    stError("%s error uidData is null,uid:%"PRIu64, __func__, uid);
×
1308
  } else {
1309
    tagCache = *(SArray**)uidData;
71,222,493✔
1310
    if(taosArrayGetSize(tagCache) != numOfExpr) {
71,231,963✔
UNCOV
1311
      stError("%s numOfExpr:%d,tagCache size:%zu", __func__, numOfExpr, taosArrayGetSize(tagCache));
×
UNCOV
1312
      tagCache = NULL;
×
1313
    }
1314
  }
1315
  
1316
  for (int32_t j = 0; j < numOfExpr; ++j) {
309,186,928✔
1317
    const SExprInfo* pExpr1 = &pExprInfo[j];
237,922,012✔
1318
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
237,954,814✔
1319

1320
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
237,961,341✔
1321
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
237,909,805✔
1322
    int32_t functionId = pExpr1->pExpr->_function.functionId;
237,909,805✔
1323

1324
    // this is to handle the tbname
1325
    if (fmIsScanPseudoColumnFunc(functionId)) {
237,940,448✔
1326
      int32_t fType = pExpr1->pExpr->_function.functionType;
3,497,921✔
1327
      if (fType == FUNCTION_TYPE_TBNAME) {
3,498,148✔
1328
        pColInfoData->info.colId = -1;
3,498,148✔
1329
      }
1330
    } 
1331
    char* data = tagCache == NULL ? NULL : taosArrayGetP(tagCache, j);
237,873,920✔
1332

1333
    bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
237,764,629✔
1334
    if (isNullVal) {
237,858,449✔
UNCOV
1335
      colDataSetNNULL(pColInfoData, currentRow, numOfRows);
×
1336
    } else {
1337
      if (!IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
237,858,449✔
1338
        for (uint32_t i = 0; i < numOfRows; i++){
2,147,483,647✔
1339
          colDataClearNull_f(pColInfoData->nullbitmap, currentRow + i);
2,147,483,647✔
1340
        }
1341
      }
1342
      code = colDataSetNItems(pColInfoData, currentRow, (const char*)data, numOfRows, numOfBlocks, false);
237,731,616✔
1343
      STREAM_CHECK_RET_GOTO(code);
237,950,223✔
1344
    }
1345
  }
1346
end:
71,264,916✔
1347
  taosRUnLockLatch(lock);
71,264,916✔
1348
  return code;
71,239,523✔
1349
}
1350

1351
static int32_t processTag(SStreamTriggerReaderInfo* info, bool isCalc, 
3,396,030✔
1352
  uint64_t uid, SSDataBlock* pBlock, uint32_t currentRow, uint32_t numOfRows, uint32_t numOfBlocks) {
1353
  int32_t     code = 0;
3,396,030✔
1354
  int32_t     lino = 0;
3,396,030✔
1355

1356
  void* pTask = info->pTask;
3,396,030✔
1357
  ST_TASK_DLOG("%s start. rows:%" PRIu32 ",uid:%"PRIu64, __func__,  numOfRows, uid);
3,396,030✔
1358
  
1359
  SHashObj* metaCache = isCalc ? info->pTableMetaCacheCalc : info->pTableMetaCacheTrigger;
3,395,576✔
1360
  SExprInfo*   pExprInfo = isCalc ? info->pExprInfoCalcTag : info->pExprInfoTriggerTag; 
3,395,803✔
1361
  int32_t      numOfExpr = isCalc ? info->numOfExprCalcTag : info->numOfExprTriggerTag;
3,395,803✔
1362
  
1363
  code = fillTag(metaCache, pExprInfo, numOfExpr, uid, pBlock, currentRow, numOfRows, numOfBlocks, &info->lock);
3,396,030✔
1364
  STREAM_CHECK_RET_GOTO(code);
3,396,030✔
1365

1366
end:
3,396,030✔
1367
  return code;
3,396,030✔
1368
}
1369

1370
int32_t getRowRange(SColData* pCol, STimeWindow* window, int32_t* rowStart, int32_t* rowEnd, int32_t* nRows) {
×
1371
  int32_t code = 0;
×
UNCOV
1372
  int32_t lino = 0;
×
1373
  *nRows = 0;
×
1374
  *rowStart = 0;
×
UNCOV
1375
  *rowEnd = pCol->nVal;
×
UNCOV
1376
  if (window != NULL) {
×
1377
    SColVal colVal = {0};
×
UNCOV
1378
    *rowStart = -1;
×
1379
    *rowEnd = -1;
×
1380
    for (int32_t k = 0; k < pCol->nVal; k++) {
×
UNCOV
1381
      STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
UNCOV
1382
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
1383
      if (ts >= window->skey && *rowStart == -1) {
×
UNCOV
1384
        *rowStart = k;
×
1385
      }
1386
      if (ts > window->ekey && *rowEnd == -1) {
×
UNCOV
1387
        *rowEnd = k;
×
1388
      }
1389
    }
1390
    STREAM_CHECK_CONDITION_GOTO(*rowStart == -1 || *rowStart == *rowEnd, TDB_CODE_SUCCESS);
×
1391

1392
    if (*rowStart != -1 && *rowEnd == -1) {
×
1393
      *rowEnd = pCol->nVal;
×
1394
    }
1395
  }
UNCOV
1396
  *nRows = *rowEnd - *rowStart;
×
1397

1398
end:
×
1399
  return code;
×
1400
}
1401

UNCOV
1402
static int32_t setColData(int64_t rows, int32_t rowStart, int32_t rowEnd, SColData* colData, SColumnInfoData* pColData) {
×
UNCOV
1403
  int32_t code = 0;
×
UNCOV
1404
  int32_t lino = 0;
×
UNCOV
1405
  for (int32_t k = rowStart; k < rowEnd; k++) {
×
UNCOV
1406
    SColVal colVal = {0};
×
UNCOV
1407
    STREAM_CHECK_RET_GOTO(tColDataGetValue(colData, k, &colVal));
×
UNCOV
1408
    STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, rows + k - rowStart, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
1409
                                        !COL_VAL_IS_VALUE(&colVal)));
1410
  }
UNCOV
1411
  end:
×
UNCOV
1412
  return code;
×
1413
}
1414

1415
static int32_t getColId(int64_t suid, int64_t uid, int16_t i, SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, int16_t* colId) {
20,849,712✔
1416
  int32_t code = 0;
20,849,712✔
1417
  int32_t lino = 0;
20,849,712✔
1418
  int64_t id[2] = {suid, uid};
20,849,712✔
1419
  taosRLockLatch(&sStreamReaderInfo->lock);
20,850,462✔
1420
  void *px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
20,850,462✔
1421
  STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
20,849,783✔
1422
  SSHashObj* uInfo = *(SSHashObj **)px;
20,849,783✔
1423
  STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
20,850,010✔
1424
  int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
20,850,010✔
1425
  if (tmp != NULL) {
20,850,462✔
1426
    *colId = *tmp;
19,248,189✔
1427
  } else {
1428
    *colId = -1;
1,602,273✔
1429
  }
1430

1431
end:
20,850,462✔
1432
  taosRUnLockLatch(&sStreamReaderInfo->lock);
20,850,462✔
1433
  return code;
20,850,462✔
1434
}
1435

1436
static int32_t getSchemas(SVnode* pVnode, int64_t suid, int64_t uid, int32_t sver, SStreamTriggerReaderInfo* sStreamReaderInfo, STSchema** schema) {
9,981,319✔
1437
  int32_t code = 0;
9,981,319✔
1438
  int32_t lino = 0;
9,981,319✔
1439
  int64_t id = suid != 0 ? suid : uid;
9,981,319✔
1440
  if (sStreamReaderInfo->isVtableStream) {
9,981,319✔
1441
    STSchema** schemaTmp = taosHashGet(sStreamReaderInfo->triggerTableSchemaMapVTable, &id, LONG_BYTES);
6,744,572✔
1442
    if (schemaTmp == NULL || *schemaTmp == NULL || (*schemaTmp)->version != sver) {
6,744,572✔
1443
      *schema = metaGetTbTSchema(pVnode->pMeta, id, sver, 1);
62,406✔
1444
      STREAM_CHECK_NULL_GOTO(*schema, terrno);
62,406✔
1445
      code = taosHashPut(sStreamReaderInfo->triggerTableSchemaMapVTable, &id, LONG_BYTES, schema, POINTER_BYTES);
62,406✔
1446
      if (code != 0) {
62,406✔
UNCOV
1447
        taosMemoryFree(*schema);
×
UNCOV
1448
        goto end;
×
1449
      }
1450
    } else {
1451
      *schema = *schemaTmp;
6,682,166✔
1452
    }
1453
  } else {
1454
    if (sStreamReaderInfo->triggerTableSchema == NULL || sStreamReaderInfo->triggerTableSchema->version != sver) {
3,236,747✔
1455
      taosMemoryFree(sStreamReaderInfo->triggerTableSchema);
79,763✔
1456
      sStreamReaderInfo->triggerTableSchema = metaGetTbTSchema(pVnode->pMeta, id, sver, 1);
79,763✔
1457
      STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerTableSchema, terrno);
79,548✔
1458
    }
1459
    *schema = sStreamReaderInfo->triggerTableSchema;
3,236,520✔
1460
  }
1461
  
1462
end:
9,980,877✔
1463
  return code;
9,980,877✔
1464
}
1465

1466
static int32_t scanSubmitTbData(SVnode* pVnode, SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, 
10,805,863✔
1467
  SSHashObj* ranges, SSHashObj* gidHash, SSTriggerWalNewRsp* rsp, int64_t ver) {
1468
  int32_t code = 0;
10,805,863✔
1469
  int32_t lino = 0;
10,805,863✔
1470
  uint64_t id = 0;
10,805,863✔
1471
  WalMetaResult walMeta = {0};
10,806,317✔
1472
  void* pTask = sStreamReaderInfo->pTask;
10,806,766✔
1473
  SSDataBlock * pBlock = (SSDataBlock*)rsp->dataBlock;
10,806,080✔
1474

1475
  if (tStartDecode(pCoder) < 0) {
10,805,857✔
UNCOV
1476
    ST_TASK_ELOG("vgId:%d %s invalid submit data", TD_VID(pVnode), __func__);
×
UNCOV
1477
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1478
    TSDB_CHECK_CODE(code, lino, end);
×
1479
  }
1480

1481
  SSubmitTbData submitTbData = {0};
10,806,547✔
1482
  uint8_t       version = 0;
10,806,080✔
1483
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
10,805,627✔
UNCOV
1484
    ST_TASK_ELOG("vgId:%d %s invalid submit data flags", TD_VID(pVnode), __func__);
×
UNCOV
1485
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1486
    TSDB_CHECK_CODE(code, lino, end);
×
1487
  }
1488
  version = (submitTbData.flags >> 8) & 0xff;
10,805,627✔
1489
  submitTbData.flags = submitTbData.flags & 0xff;
10,805,627✔
1490
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1491
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
10,805,627✔
1492
    if (tStartDecode(pCoder) < 0) {
73,430✔
UNCOV
1493
      ST_TASK_ELOG("vgId:%d %s invalid auto create table data", TD_VID(pVnode), __func__);
×
1494
      code = TSDB_CODE_INVALID_MSG;
×
1495
      TSDB_CHECK_CODE(code, lino, end);
×
1496
    }
1497
    tEndDecode(pCoder);
73,430✔
1498
  }
1499

1500
  // submit data
1501
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
10,806,293✔
UNCOV
1502
    ST_TASK_ELOG("vgId:%d %s invalid submit data suid", TD_VID(pVnode), __func__);
×
UNCOV
1503
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1504
    TSDB_CHECK_CODE(code, lino, end);
×
1505
  }
1506
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
10,806,080✔
UNCOV
1507
    ST_TASK_ELOG("vgId:%d %s invalid submit data uid", TD_VID(pVnode), __func__);
×
UNCOV
1508
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1509
    TSDB_CHECK_CODE(code, lino, end);
×
1510
  }
1511

1512
  ST_TASK_DLOG("%s uid:%" PRId64 ", suid:%" PRId64 ", ver:%" PRId64, __func__, submitTbData.uid, submitTbData.suid, ver);
10,806,080✔
1513

1514
  if (rsp->uidHash != NULL) {
10,807,070✔
1515
    uint64_t* gid = tSimpleHashGet(rsp->uidHash, &submitTbData.uid, LONG_BYTES);
8,229,203✔
1516
    STREAM_CHECK_CONDITION_GOTO(gid == NULL, TDB_CODE_SUCCESS);
8,229,432✔
1517
    ST_TASK_DLOG("%s get uid gid from uidHash, uid:%" PRId64 ", suid:%" PRId64 " gid:%"PRIu64, __func__, submitTbData.uid, submitTbData.suid, *gid);
8,229,432✔
1518
    id = *gid;
8,229,432✔
1519
  } else {
1520
    STREAM_CHECK_CONDITION_GOTO(!uidInTableListSet(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &id, rsp->isCalc), TDB_CODE_SUCCESS);
2,576,436✔
1521
  }
1522

1523
  walMeta.id = id;
9,981,094✔
1524
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
9,981,094✔
1525

1526
  if (ranges != NULL){
9,981,094✔
1527
    void* timerange = tSimpleHashGet(ranges, &id, sizeof(id));
8,229,432✔
1528
    if (timerange == NULL) goto end;;
8,229,657✔
1529
    int64_t* pRange = (int64_t*)timerange;
8,229,657✔
1530
    window.skey = pRange[0];
8,229,657✔
1531
    window.ekey = pRange[1];
8,229,657✔
1532
    ST_TASK_DLOG("%s get time range from ranges, uid:%" PRId64 ", suid:%" PRId64 ", gid:%" PRIu64 ", skey:%" PRId64 ", ekey:%" PRId64,
8,229,657✔
1533
      __func__, submitTbData.uid, submitTbData.suid, id, window.skey, window.ekey);
1534
  }
1535
  
1536
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
9,981,094✔
1537
    ST_TASK_ELOG("vgId:%d %s invalid submit data sver", TD_VID(pVnode), __func__);
×
1538
    code = TSDB_CODE_INVALID_MSG;
×
1539
    TSDB_CHECK_CODE(code, lino, end);
×
1540
  }
1541

1542
  STSchema*    schema = NULL;
9,981,094✔
1543
  STREAM_CHECK_RET_GOTO(getSchemas(pVnode, submitTbData.suid, submitTbData.uid, submitTbData.sver, sStreamReaderInfo, &schema));
9,981,094✔
1544

1545
  SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(rsp->indexHash, &submitTbData.uid, LONG_BYTES);
9,980,865✔
1546
  int32_t blockStart = 0;
9,981,097✔
1547
  int32_t numOfRows = 0;
9,981,097✔
1548
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
9,978,794✔
UNCOV
1549
    uint64_t nColData = 0;
×
UNCOV
1550
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1551
      ST_TASK_ELOG("vgId:%d %s invalid submit data nColData", TD_VID(pVnode), __func__);
×
1552
      code = TSDB_CODE_INVALID_MSG;
×
1553
      TSDB_CHECK_CODE(code, lino, end);
×
1554
    }
1555

UNCOV
1556
    SColData colData = {0};
×
1557
    code = tDecodeColData(version, pCoder, &colData, false);
×
1558
    if (code) {
×
UNCOV
1559
      ST_TASK_ELOG("vgId:%d %s invalid submit data colData", TD_VID(pVnode), __func__);
×
1560
      code = TSDB_CODE_INVALID_MSG;
×
1561
      TSDB_CHECK_CODE(code, lino, end);
×
1562
    }
1563

UNCOV
1564
    if (colData.flag != HAS_VALUE) {
×
1565
      ST_TASK_ELOG("vgId:%d %s invalid submit data colData flag", TD_VID(pVnode), __func__);
×
1566
      code = TSDB_CODE_INVALID_MSG;
×
1567
      TSDB_CHECK_CODE(code, lino, end);
×
1568
    }
1569
    
1570
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
1571
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
1572

1573
    int32_t rowStart = 0;
×
UNCOV
1574
    int32_t rowEnd = 0;
×
1575
    STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, &numOfRows));
×
1576
    STREAM_CHECK_CONDITION_GOTO(numOfRows <= 0, TDB_CODE_SUCCESS);
×
1577

UNCOV
1578
    STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
×
UNCOV
1579
    blockStart = pSlice->currentRowIdx;
×
1580
    int32_t pos = pCoder->pos;
×
UNCOV
1581
    for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
1582
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
1583
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
1584
      if (pColData->info.colId <= -1) {
×
1585
        pColData->hasNull = true;
×
UNCOV
1586
        continue;
×
1587
      }
UNCOV
1588
      if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
UNCOV
1589
        STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colData, pColData));
×
1590
        continue;
×
1591
      }
1592

1593
      pCoder->pos = pos;
×
1594

1595
      int16_t colId = 0;
×
1596
      if (sStreamReaderInfo->isVtableStream){
×
1597
        STREAM_CHECK_RET_GOTO(getColId(submitTbData.suid, submitTbData.uid, i, sStreamReaderInfo, rsp, &colId));
×
1598
        ST_TASK_TLOG("%s vtable colId:%d, i:%d, uid:%" PRId64, __func__, colId, i, submitTbData.uid);
×
1599
      } else {
1600
        colId = pColData->info.colId;
×
1601
      }
1602
      
UNCOV
1603
      uint64_t j = 1;
×
1604
      for (; j < nColData; j++) {
×
1605
        int16_t cid = 0;
×
UNCOV
1606
        int32_t posTmp = pCoder->pos;
×
1607
        pCoder->pos += INT_BYTES;
×
1608
        if ((code = tDecodeI16v(pCoder, &cid))) return code;
×
1609
        pCoder->pos = posTmp;
×
1610
        if (cid == colId) {
×
UNCOV
1611
          SColData colDataTmp = {0};
×
UNCOV
1612
          code = tDecodeColData(version, pCoder, &colDataTmp, false);
×
1613
          if (code) {
×
1614
            code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1615
            TSDB_CHECK_CODE(code, lino, end);
×
1616
          }
UNCOV
1617
          STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colDataTmp, pColData));
×
UNCOV
1618
          break;
×
1619
        }
1620
        code = tDecodeColData(version, pCoder, &colData, true);
×
1621
        if (code) {
×
UNCOV
1622
          code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1623
          TSDB_CHECK_CODE(code, lino, end);
×
1624
        }
1625
      }
UNCOV
1626
      if (j == nColData) {
×
UNCOV
1627
        colDataSetNNULL(pColData, blockStart, numOfRows);
×
1628
      }
1629
    }
1630
  } else {
1631
    uint64_t nRow = 0;
9,978,794✔
1632
    if (tDecodeU64v(pCoder, &nRow) < 0) {
9,981,319✔
UNCOV
1633
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1634
      TSDB_CHECK_CODE(code, lino, end);
×
1635
    }
1636
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
21,192,626✔
1637
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
11,211,761✔
1638
      pCoder->pos += pRow->len;
11,211,757✔
1639

1640
      if (iRow == 0){
11,211,757✔
1641
#ifndef NO_UNALIGNED_ACCESS
1642
        walMeta.skey = pRow->ts;
9,981,319✔
1643
#else
1644
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
1645
#endif
1646
      }
1647
      if (iRow == nRow - 1) {
11,210,865✔
1648
#ifndef NO_UNALIGNED_ACCESS
1649
        walMeta.ekey = pRow->ts;
9,981,092✔
1650
#else
1651
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
1652
#endif
1653
      }
1654

1655
      if (pRow->ts < window.skey || pRow->ts > window.ekey) {
11,210,865✔
1656
        continue;
7,747✔
1657
      }
1658
      STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
11,204,014✔
1659
      blockStart = pSlice->currentRowIdx;
11,204,014✔
1660
     
1661
      for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
69,435,584✔
1662
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
58,230,249✔
1663
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
58,229,316✔
1664
        if (pColData->info.colId <= -1) {
58,229,316✔
1665
          pColData->hasNull = true;
19,903,242✔
1666
          continue;
19,903,242✔
1667
        }
1668
        int16_t colId = 0;
38,328,981✔
1669
        if (sStreamReaderInfo->isVtableStream){
38,328,981✔
1670
          STREAM_CHECK_RET_GOTO(getColId(submitTbData.suid, submitTbData.uid, i, sStreamReaderInfo, rsp, &colId));
20,848,583✔
1671
          ST_TASK_TLOG("%s vtable colId:%d, i:%d, uid:%" PRId64, __func__, colId, i, submitTbData.uid);
20,850,235✔
1672
        } else {
1673
          colId = pColData->info.colId;
17,479,497✔
1674
        }
1675
        
1676
        SColVal colVal = {0};
38,329,954✔
1677
        int32_t sourceIdx = 0;
38,330,194✔
1678
        while (1) {
1679
          if (sourceIdx >= schema->numOfCols) {
104,290,137✔
1680
            break;
8,527,929✔
1681
          }
1682
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schema, sourceIdx, &colVal));
95,760,868✔
1683
          if (colVal.cid == colId) {
95,762,004✔
1684
            break;
29,802,061✔
1685
          }
1686
          sourceIdx++;
65,959,943✔
1687
        }
1688
        if (colVal.cid == colId && COL_VAL_IS_VALUE(&colVal)) {
38,329,990✔
1689
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
27,455,860✔
1690
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, blockStart+ numOfRows, (const char*)colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
25,578✔
1691
            ST_TASK_TLOG("%s vtable colId:%d, i:%d, colData:%p, data:%s, len:%d, rowIndex:%d, offset:%d, uid:%" PRId64, __func__, colId, i, pColData, 
25,354✔
1692
              (const char*)colVal.value.pData, colVal.value.nData, blockStart+ numOfRows, pColData->varmeta.offset[blockStart+ numOfRows], submitTbData.uid);
1693
          } else {
1694
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, blockStart + numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
27,430,282✔
1695
          }
1696
        } else {
1697
          colDataSetNULL(pColData, blockStart + numOfRows);
10,874,130✔
1698
        }
1699
      }
1700
      
1701
      numOfRows++;
11,203,560✔
1702
    }
1703
  }
1704

1705
  if (numOfRows > 0) {
9,980,869✔
1706
    if (!sStreamReaderInfo->isVtableStream) {
9,980,869✔
1707
      STREAM_CHECK_RET_GOTO(processTag(sStreamReaderInfo, rsp->isCalc, submitTbData.uid, pBlock, blockStart, numOfRows, 1));
3,236,520✔
1708
    }
1709
    
1710
    SColumnInfoData* pColData = taosArrayGetLast(pBlock->pDataBlock);
9,981,083✔
1711
    STREAM_CHECK_NULL_GOTO(pColData, terrno);
9,981,079✔
1712
    STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, blockStart, (const char*)&ver, numOfRows, 1, false));
9,981,079✔
1713

1714
    STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
9,980,638✔
1715
    ST_TASK_DLOG("%s process submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
9,980,638✔
1716
      ", uid:%" PRId64 ", ver:%"PRId64 ", row index:%d, rows:%d", __func__, window.skey, window.ekey, 
1717
      id, submitTbData.uid, ver, pSlice->currentRowIdx, numOfRows);
1718
    pSlice->currentRowIdx += numOfRows;
9,980,871✔
1719
    pBlock->info.rows += numOfRows;
9,980,344✔
1720
  } else {
UNCOV
1721
    ST_TASK_DLOG("%s no valid data in time range:skey %" PRId64 ", ekey %" PRId64 ", uid:%" PRId64 ", suid:%" PRId64,
×
1722
      __func__, window.skey, window.ekey, submitTbData.uid, submitTbData.suid);
1723
  }
1724
  
1725
  if (gidHash == NULL) goto end;
9,980,867✔
1726

1727
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
1,751,435✔
1728
  if (data != NULL) {
1,751,662✔
UNCOV
1729
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
×
UNCOV
1730
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
×
1731
  } else {
1732
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
1,751,662✔
1733
  }
1734

1735
end:
10,804,048✔
1736
  if (code != 0) {                                                             \
10,806,768✔
UNCOV
1737
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); \
×
1738
  }
1739
  tEndDecode(pCoder);
10,806,768✔
1740
  return code;
10,806,533✔
1741
}
1742
static int32_t scanSubmitData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo,
10,806,543✔
1743
  void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp, int64_t ver) {
1744
  int32_t  code = 0;
10,806,543✔
1745
  int32_t  lino = 0;
10,806,543✔
1746
  SDecoder decoder = {0};
10,806,543✔
1747
  SSHashObj* gidHash = NULL;
10,806,543✔
1748
  void* pTask = sStreamReaderInfo->pTask;
10,806,543✔
1749

1750
  tDecoderInit(&decoder, data, len);
10,806,543✔
1751
  if (tStartDecode(&decoder) < 0) {
10,805,649✔
UNCOV
1752
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1753
    TSDB_CHECK_CODE(code, lino, end);
×
1754
  }
1755

1756
  uint64_t nSubmitTbData = 0;
10,806,545✔
1757
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
10,806,766✔
UNCOV
1758
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1759
    TSDB_CHECK_CODE(code, lino, end);
×
1760
  }
1761

1762
  if (rsp->metaBlock != NULL){
10,806,766✔
1763
    gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
2,577,336✔
1764
    STREAM_CHECK_NULL_GOTO(gidHash, terrno);
2,576,214✔
1765
  }
1766

1767
  for (int32_t i = 0; i < nSubmitTbData; i++) {
21,612,864✔
1768
    STREAM_CHECK_RET_GOTO(scanSubmitTbData(pVnode, &decoder, sStreamReaderInfo, ranges, gidHash, rsp, ver));
10,805,648✔
1769
  }
1770

1771
  tEndDecode(&decoder);
10,807,216✔
1772

1773
  if (rsp->metaBlock != NULL){
10,806,548✔
1774
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
2,577,113✔
1775
    int32_t iter = 0;
2,576,642✔
1776
    void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
2,576,642✔
1777
    while (px != NULL) {
4,328,329✔
1778
      WalMetaResult* pMeta = (WalMetaResult*)px;
1,751,439✔
1779
      STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
1,751,439✔
1780
      ((SSDataBlock*)rsp->metaBlock)->info.rows++;
1,750,993✔
1781
      rsp->totalRows++;
1,750,993✔
1782
      ST_TASK_DLOG("%s process meta data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
1,750,993✔
1783
            ", ver:%"PRId64, __func__, pMeta->skey, pMeta->ekey, pMeta->id, ver);
1784
      px = tSimpleHashIterate(gidHash, px, &iter);
1,750,760✔
1785
    }
1786
  }
1787
  
1788

1789
end:
10,803,496✔
1790
  tSimpleHashCleanup(gidHash);
10,806,770✔
1791
  tDecoderClear(&decoder);
10,806,543✔
1792
  return code;
10,806,526✔
1793
}
1794

1795
static int32_t scanSubmitTbDataPre(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* ranges, 
12,769,733✔
1796
  uint64_t* gid, int64_t* uid, int32_t* numOfRows, SSTriggerWalNewRsp* rsp, int64_t ver) {
1797
  int32_t code = 0;
12,769,733✔
1798
  int32_t lino = 0;
12,769,733✔
1799
  void* pTask = sStreamReaderInfo->pTask;
12,769,733✔
1800

1801
  if (tStartDecode(pCoder) < 0) {
12,769,508✔
UNCOV
1802
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1803
    TSDB_CHECK_CODE(code, lino, end);
×
1804
  }
1805

1806
  SSubmitTbData submitTbData = {0};
12,770,178✔
1807
  uint8_t       version = 0;
12,769,956✔
1808
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
12,769,065✔
UNCOV
1809
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1810
    TSDB_CHECK_CODE(code, lino, end);
×
1811
  }
1812
  version = (submitTbData.flags >> 8) & 0xff;
12,769,065✔
1813
  submitTbData.flags = submitTbData.flags & 0xff;
12,769,065✔
1814

1815
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1816
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
12,769,065✔
1817
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
786,460✔
1818
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
786,004✔
1819
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
786,004✔
1820
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq, ver));
786,460✔
1821
  }
1822

1823
  // submit data
1824
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
12,770,178✔
UNCOV
1825
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1826
    TSDB_CHECK_CODE(code, lino, end);
×
1827
  }
1828
  if (tDecodeI64(pCoder, uid) < 0) {
12,769,274✔
UNCOV
1829
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1830
    TSDB_CHECK_CODE(code, lino, end);
×
1831
  }
1832
  ST_TASK_DLOG("%s uid:%" PRId64 ", suid:%" PRId64, __func__, *uid, submitTbData.suid);
12,769,274✔
1833
  STREAM_CHECK_CONDITION_GOTO(!uidInTableListSet(sStreamReaderInfo, submitTbData.suid, *uid, gid, rsp->isCalc), TDB_CODE_SUCCESS);
12,769,075✔
1834
  if (rsp->uidHash != NULL) {
9,980,874✔
1835
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(rsp->uidHash, uid, LONG_BYTES, gid, LONG_BYTES));
8,229,432✔
1836
    ST_TASK_DLOG("%s put uid into uidHash, uid:%" PRId64 ", suid:%" PRId64 " gid:%"PRIu64, __func__, *uid, submitTbData.suid, *gid);
8,229,432✔
1837
  }
1838
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
9,980,649✔
1839

1840
  if (ranges != NULL){
9,980,418✔
1841
    void* timerange = tSimpleHashGet(ranges, gid, sizeof(*gid));
8,229,657✔
1842
    if (timerange == NULL) goto end;;
8,229,657✔
1843
    int64_t* pRange = (int64_t*)timerange;
8,229,657✔
1844
    window.skey = pRange[0];
8,229,657✔
1845
    window.ekey = pRange[1];
8,229,657✔
1846
  }
1847
  
1848
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
9,980,870✔
1849
    code = TSDB_CODE_INVALID_MSG;
×
1850
    TSDB_CHECK_CODE(code, lino, end);
×
1851
  }
1852

1853
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
9,980,870✔
1854
    uint64_t nColData = 0;
×
1855
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1856
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1857
      TSDB_CHECK_CODE(code, lino, end);
×
1858
    }
1859

1860
    SColData colData = {0};
×
1861
    code = tDecodeColData(version, pCoder, &colData, false);
×
UNCOV
1862
    if (code) {
×
1863
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1864
      TSDB_CHECK_CODE(code, lino, end);
×
1865
    }
1866

UNCOV
1867
    if (colData.flag != HAS_VALUE) {
×
1868
      code = TSDB_CODE_INVALID_MSG;
×
1869
      TSDB_CHECK_CODE(code, lino, end);
×
1870
    }
UNCOV
1871
    int32_t rowStart = 0;
×
UNCOV
1872
    int32_t rowEnd = 0;
×
UNCOV
1873
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) {
×
UNCOV
1874
      STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, numOfRows));
×
1875
    } else {
UNCOV
1876
      (*numOfRows) = colData.nVal;
×
1877
    } 
1878
  } else {
1879
    uint64_t nRow = 0;
9,980,870✔
1880
    if (tDecodeU64v(pCoder, &nRow) < 0) {
9,981,319✔
UNCOV
1881
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1882
      TSDB_CHECK_CODE(code, lino, end);
×
1883
    }
1884

1885
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) { 
9,981,319✔
1886
      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
17,487,953✔
1887
        SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
9,258,069✔
1888
        pCoder->pos += pRow->len;
9,258,069✔
1889
        if (pRow->ts < window.skey || pRow->ts > window.ekey) {
9,258,069✔
1890
          continue;
7,747✔
1891
        }
1892
        (*numOfRows)++;
9,250,322✔
1893
      }
1894
    } else {
1895
      (*numOfRows) = nRow;
1,751,435✔
1896
    }
1897
  }
1898
  
1899
end:
12,770,178✔
1900
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
12,769,286✔
1901
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
12,768,600✔
1902
  tEndDecode(pCoder);
12,768,600✔
1903
  return code;
12,769,512✔
1904
}
1905

1906
static int32_t scanSubmitDataPre(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp, int64_t ver) {
12,769,720✔
1907
  int32_t  code = 0;
12,769,720✔
1908
  int32_t  lino = 0;
12,769,720✔
1909
  SDecoder decoder = {0};
12,769,720✔
1910
  void* pTask = sStreamReaderInfo->pTask;
12,770,182✔
1911

1912
  tDecoderInit(&decoder, data, len);
12,770,182✔
1913
  if (tStartDecode(&decoder) < 0) {
12,765,936✔
UNCOV
1914
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1915
    TSDB_CHECK_CODE(code, lino, end);
×
1916
  }
1917

1918
  uint64_t nSubmitTbData = 0;
12,769,942✔
1919
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
12,769,257✔
UNCOV
1920
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1921
    TSDB_CHECK_CODE(code, lino, end);
×
1922
  }
1923
  ST_TASK_DLOG("%s nSubmitTbData:%" PRIu64 ", ver:%"PRId64 " bodyLen:%d", __func__, nSubmitTbData, ver, len);
12,769,257✔
1924

1925
  for (int32_t i = 0; i < nSubmitTbData; i++) {
25,538,801✔
1926
    uint64_t gid = -1;
12,769,511✔
1927
    int64_t  uid = 0;
12,769,061✔
1928
    int32_t numOfRows = 0;
12,769,283✔
1929
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataPre(&decoder, sStreamReaderInfo, ranges, &gid, &uid, &numOfRows, rsp, ver));
12,768,838✔
1930
    if (numOfRows <= 0) {
12,769,512✔
1931
      ST_TASK_DLOG("%s no valid data uid:%" PRId64 ", gid:%" PRIu64 ", numOfRows:%d, ver:%"PRId64, __func__, uid, gid, numOfRows, ver);
2,789,086✔
1932
      continue;
2,789,086✔
1933
    }
1934
    rsp->totalRows += numOfRows;
9,980,426✔
1935
    rsp->totalDataRows += numOfRows;
9,980,425✔
1936

1937
    SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(rsp->indexHash, &uid, LONG_BYTES);
9,980,425✔
1938
    if (pSlice != NULL) {
9,980,869✔
1939
      pSlice->numRows += numOfRows;
9,295,344✔
1940
      ST_TASK_DLOG("%s again uid:%" PRId64 ", gid:%" PRIu64 ", total numOfRows:%d, hash:%p %d, ver:%"PRId64, __func__, uid, gid, pSlice->numRows, rsp->indexHash, tSimpleHashGetSize(rsp->indexHash), ver);
9,295,344✔
1941
      pSlice->gId = gid;
9,295,344✔
1942
    } else {
1943
      SStreamWalDataSlice tmp = {.gId=gid,.numRows=numOfRows,.currentRowIdx=0,.startRowIdx=0};
685,525✔
1944
      ST_TASK_DLOG("%s first uid:%" PRId64 ", gid:%" PRIu64 ", numOfRows:%d, hash:%p %d, ver:%"PRId64, __func__, uid, gid, tmp.numRows, rsp->indexHash, tSimpleHashGetSize(rsp->indexHash), ver);
685,752✔
1945
      STREAM_CHECK_RET_GOTO(tSimpleHashPut(rsp->indexHash, &uid, LONG_BYTES, &tmp, sizeof(tmp)));
685,752✔
1946
    } 
1947
  }
1948

1949
  tEndDecode(&decoder);
12,769,290✔
1950

1951
end:
12,769,942✔
1952
  tDecoderClear(&decoder);
12,769,269✔
1953
  return code;
12,769,698✔
1954
}
1955

1956
static void buildIndexHash(SSHashObj* indexHash, void* pTask){
573,452✔
1957
  void*   pe = NULL;
573,452✔
1958
  int32_t iter = 0;
573,452✔
1959
  int32_t index = 0;
573,452✔
1960
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
1,259,204✔
1961
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
685,752✔
1962
    pInfo->startRowIdx = index;
685,752✔
1963
    pInfo->currentRowIdx = index;
685,752✔
1964
    index += pInfo->numRows;
685,752✔
1965
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
1,199,988✔
1966
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1967
  }
1968
}
573,452✔
1969

1970
static void printIndexHash(SSHashObj* indexHash, void* pTask){
573,452✔
1971
  if (qDebugFlag & DEBUG_TRACE) {
573,452✔
1972
    void*   pe = NULL;
2,416✔
1973
    int32_t iter = 0;
2,416✔
1974
    while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
4,832✔
1975
      SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
2,416✔
1976
      ST_TASK_TLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
2,630✔
1977
      pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1978
    }
1979
  }
1980
}
573,452✔
1981

1982
static void filterIndexHash(SSHashObj* indexHash, SColumnInfoData* pRet){
11,179✔
1983
  void*   pe = NULL;
11,179✔
1984
  int32_t iter = 0;
11,179✔
1985
  int32_t index = 0;
11,179✔
1986
  int32_t pIndex = 0;
11,179✔
1987
  int8_t* pIndicator = (int8_t*)pRet->pData;
11,179✔
1988
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
23,892✔
1989
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
12,713✔
1990
    pInfo->startRowIdx = index;
12,713✔
1991
    int32_t size = pInfo->numRows;
12,713✔
1992
    for (int32_t i = 0; i < pInfo->numRows; i++) {
199,531✔
1993
      if (pIndicator && !pIndicator[pIndex++]) {
186,818✔
1994
        size--;
62,315✔
1995
      }
1996
    }
1997
    pInfo->numRows = size;
12,713✔
1998
    index += pInfo->numRows;
12,713✔
1999
    stTrace("stream reader re build index hash uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
12,713✔
2000
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
2001
  }
2002
}
11,179✔
2003

2004
static int32_t prepareIndexMetaData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* resultRsp){
3,259,481✔
2005
  int32_t      code = 0;
3,259,481✔
2006
  int32_t      lino = 0;
3,259,481✔
2007
  void* pTask = sStreamReaderInfo->pTask;
3,259,481✔
2008

2009
  code = walReaderSeekVer(pWalReader, resultRsp->ver);
3,259,257✔
2010
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
3,256,521✔
2011
    if (resultRsp->ver < walGetFirstVer(pWalReader->pWal)) {
2,809,120✔
UNCOV
2012
      resultRsp->ver = walGetFirstVer(pWalReader->pWal);
×
UNCOV
2013
      resultRsp->verTime = 0;
×
2014
    } else {
2015
      resultRsp->verTime = taosGetTimestampUs();
2,808,700✔
2016
    }
2017
    ST_TASK_DLOG("%s scan wal end:%s",  __func__, tstrerror(code));
2,808,421✔
2018
    code = TSDB_CODE_SUCCESS;
2,810,865✔
2019
    goto end;
2,810,865✔
2020
  }
2021
  STREAM_CHECK_RET_GOTO(code);
447,401✔
2022

2023
  while (1) {
4,647,185✔
2024
    code = walNextValidMsg(pWalReader, true);
5,094,586✔
2025
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
5,093,416✔
2026
      resultRsp->verTime = taosGetTimestampUs();
448,410✔
2027
      ST_TASK_DLOG("%s scan wal end:%s", __func__, tstrerror(code));
448,410✔
2028
      code = TSDB_CODE_SUCCESS;
448,410✔
2029
      goto end;
448,410✔
2030
    }
2031
    STREAM_CHECK_RET_GOTO(code);
4,645,233✔
2032
    resultRsp->ver = pWalReader->curVersion;
4,645,233✔
2033
    SWalCont* wCont = &pWalReader->pHead->head;
4,645,338✔
2034
    resultRsp->verTime = wCont->ingestTs;
4,645,306✔
2035
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
4,645,912✔
2036
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
4,646,342✔
2037
    int64_t ver = wCont->version;
4,646,934✔
2038
    ST_TASK_DLOG("%s scan wal ver:%" PRId64 ", type:%s, deleteData:%d, deleteTb:%d, msg len:%d", __func__,
4,646,567✔
2039
      ver, TMSG_INFO(wCont->msgType), sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl, len);
2040
    if (wCont->msgType == TDMT_VND_SUBMIT) {
4,647,017✔
2041
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
4,539,634✔
2042
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
4,539,856✔
2043
      STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, data, len, NULL, resultRsp, ver));
4,539,857✔
2044
    } else {
2045
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, resultRsp, ver));
107,106✔
2046
    }
2047

2048
    ST_TASK_DLOG("%s scan wal next ver:%" PRId64 ", totalRows:%d", __func__, resultRsp->ver, resultRsp->totalRows);
4,646,914✔
2049
    if (resultRsp->totalRows >= STREAM_RETURN_ROWS_NUM || resultRsp->needReturn) {
4,647,156✔
2050
      break;
2051
    }
2052
  }
2053
  
UNCOV
2054
end:
×
2055
  STREAM_PRINT_LOG_END(code, lino);
3,259,048✔
2056
  return code;
3,258,597✔
2057
}
2058

2059
static int32_t prepareIndexData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, 
9,098,243✔
2060
  SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp){
2061
  int32_t      code = 0;
9,098,243✔
2062
  int32_t      lino = 0;
9,098,243✔
2063
  void* pTask = sStreamReaderInfo->pTask;
9,098,243✔
2064

2065
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
17,327,660✔
2066
    int64_t *ver = taosArrayGet(versions, i);
8,229,657✔
2067
    if (ver == NULL) continue;
8,229,657✔
2068

2069
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
8,229,657✔
2070
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
8,229,205✔
UNCOV
2071
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
UNCOV
2072
      ST_TASK_TLOG("%s not data, skip, ver:%"PRId64, __func__, *ver);
×
UNCOV
2073
      continue;
×
2074
    }
2075
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
8,229,205✔
2076

2077
    SWalCont* wCont = &pWalReader->pHead->head;
8,229,417✔
2078
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
8,229,657✔
2079
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
8,229,657✔
2080

2081
    STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, pBody, bodyLen, ranges, rsp, *ver));
8,229,657✔
2082
  }
2083
  
2084
end:
9,098,243✔
2085
  return code;
9,098,243✔
2086
}
2087

2088
static int32_t filterData(SSTriggerWalNewRsp* resultRsp, SStreamTriggerReaderInfo* sStreamReaderInfo) {
573,452✔
2089
  int32_t      code = 0;
573,452✔
2090
  int32_t       lino = 0;
573,452✔
2091
  SColumnInfoData* pRet = NULL;
573,452✔
2092

2093
  int64_t totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
573,452✔
2094
  STREAM_CHECK_RET_GOTO(qStreamFilter(((SSDataBlock*)resultRsp->dataBlock), sStreamReaderInfo->pFilterInfo, &pRet));
573,452✔
2095

2096
  if (((SSDataBlock*)resultRsp->dataBlock)->info.rows < totalRows) {
573,452✔
2097
    filterIndexHash(resultRsp->indexHash, pRet);
11,179✔
2098
  }
2099

2100
end:
573,452✔
2101
  colDataDestroy(pRet);
573,452✔
2102
  taosMemoryFree(pRet);
573,452✔
2103
  return code;
573,452✔
2104
}
2105

2106
static int32_t processWalVerMetaDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
3,259,126✔
2107
                                    SSTriggerWalNewRsp* resultRsp) {
2108
  int32_t      code = 0;
3,259,126✔
2109
  int32_t      lino = 0;
3,259,126✔
2110
  void* pTask = sStreamReaderInfo->pTask;
3,259,126✔
2111
                                        
2112
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
3,259,369✔
2113
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
3,258,340✔
2114
  blockDataEmpty(resultRsp->dataBlock);
3,258,340✔
2115
  blockDataEmpty(resultRsp->metaBlock);
3,254,108✔
2116
  int64_t lastVer = resultRsp->ver;                                      
3,255,472✔
2117
  STREAM_CHECK_RET_GOTO(prepareIndexMetaData(pWalReader, sStreamReaderInfo, resultRsp));
3,255,934✔
2118
  STREAM_CHECK_CONDITION_GOTO(resultRsp->totalRows == 0, TDB_CODE_SUCCESS);
3,259,065✔
2119

2120
  buildIndexHash(resultRsp->indexHash, pTask);
115,920✔
2121
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(((SSDataBlock*)resultRsp->dataBlock), resultRsp->totalRows));
115,920✔
2122
  while(lastVer < resultRsp->ver) {
2,735,477✔
2123
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, lastVer++));
2,619,557✔
2124
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
2,618,889✔
2125
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
42,911✔
2126
      continue;
42,911✔
2127
    }
2128
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
2,575,756✔
2129
    SWalCont* wCont = &pWalReader->pHead->head;
2,575,943✔
2130
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
2,576,410✔
2131
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
2,576,410✔
2132
    ST_TASK_DLOG("process wal ver:%" PRId64 ", type:%d, bodyLen:%d", wCont->version, wCont->msgType, bodyLen);
2,576,650✔
2133
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, NULL, resultRsp, wCont->version));
2,576,650✔
2134
  }
2135

2136
  int32_t metaRows = resultRsp->totalRows - ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
115,920✔
2137
  STREAM_CHECK_RET_GOTO(filterData(resultRsp, sStreamReaderInfo));
115,920✔
2138
  resultRsp->totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows + metaRows;
115,920✔
2139

2140
end:
3,258,840✔
2141
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
3,258,840✔
2142
          resultRsp->totalRows, resultRsp->ver, walGetAppliedVer(pWalReader->pWal));
2143
  walCloseReader(pWalReader);
3,259,050✔
2144
  return code;
3,259,048✔
2145
}
2146

2147
static int32_t processWalVerDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
9,098,993✔
2148
                                    SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
2149
  int32_t      code = 0;
9,098,993✔
2150
  int32_t      lino = 0;
9,098,993✔
2151

2152
  void* pTask = sStreamReaderInfo->pTask;
9,098,993✔
2153
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
9,098,993✔
2154
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
9,098,766✔
2155
  
2156
  if (taosArrayGetSize(versions) > 0) {
9,098,766✔
2157
    rsp->ver = *(int64_t*)taosArrayGetLast(versions);
457,532✔
2158
  }
2159
  
2160
  STREAM_CHECK_RET_GOTO(prepareIndexData(pWalReader, sStreamReaderInfo, versions, ranges, rsp));
9,098,766✔
2161
  STREAM_CHECK_CONDITION_GOTO(rsp->totalRows == 0, TDB_CODE_SUCCESS);
9,098,243✔
2162

2163
  ST_TASK_TLOG("%s index hash:%p %d", __func__, rsp->indexHash, tSimpleHashGetSize(rsp->indexHash));
457,532✔
2164
  buildIndexHash(rsp->indexHash, pTask);
457,532✔
2165

2166
  blockDataEmpty(rsp->dataBlock);
457,532✔
2167
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dataBlock, rsp->totalRows));
457,532✔
2168

2169
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
8,687,189✔
2170
    int64_t *ver = taosArrayGet(versions, i);
8,229,657✔
2171
    if (ver == NULL) continue;
8,229,657✔
2172
    ST_TASK_TLOG("vgId:%d %s scan wal process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, *ver, walGetAppliedVer(pWalReader->pWal));
8,229,657✔
2173

2174
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
8,229,657✔
2175
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
8,229,657✔
UNCOV
2176
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
UNCOV
2177
      continue;
×
2178
    }
2179
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
8,229,657✔
2180
    SWalCont* wCont = &pWalReader->pHead->head;
8,229,432✔
2181
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
8,229,432✔
2182
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
8,229,432✔
2183

2184
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, ranges, rsp, wCont->version));
8,229,432✔
2185
  }
2186
  // printDataBlock(rsp->dataBlock, __func__, "processWalVerDataNew");
2187
  STREAM_CHECK_RET_GOTO(filterData(rsp, sStreamReaderInfo));
457,532✔
2188
  rsp->totalRows = ((SSDataBlock*)rsp->dataBlock)->info.rows;
457,532✔
2189

2190
end:
9,098,993✔
2191
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
9,098,993✔
2192
            rsp->totalRows, rsp->ver, walGetAppliedVer(pWalReader->pWal));
2193
  walCloseReader(pWalReader);
9,098,993✔
2194
  return code;
9,098,993✔
2195
}
2196

2197
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas, SStorageAPI* api) {
448,930✔
2198
  int32_t code = 0;
448,930✔
2199
  int32_t lino = 0;
448,930✔
2200
  SMetaReader metaReader = {0};
448,930✔
2201
  *schemas = taosArrayInit(8, sizeof(SSchema));
448,930✔
2202
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
448,930✔
2203
  
2204
  api->metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api->metaFn);
448,930✔
2205
  STREAM_CHECK_RET_GOTO(api->metaReaderFn.getTableEntryByUid(&metaReader, uid));
448,930✔
2206

2207
  SSchemaWrapper* sSchemaWrapper = NULL;
445,180✔
2208
  if (metaReader.me.type == TD_CHILD_TABLE) {
445,180✔
2209
    int64_t suid = metaReader.me.ctbEntry.suid;
445,180✔
2210
    tDecoderClear(&metaReader.coder);
445,180✔
2211
    STREAM_CHECK_RET_GOTO(api->metaReaderFn.getTableEntryByUid(&metaReader, suid));
445,180✔
2212
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
445,180✔
UNCOV
2213
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
×
UNCOV
2214
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
×
2215
  } else {
UNCOV
2216
    qError("invalid table type:%d", metaReader.me.type);
×
2217
  }
2218

2219
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
2,632,131✔
2220
    SSchema* s = sSchemaWrapper->pSchema + j;
2,186,951✔
2221
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
4,373,902✔
2222
  }
2223

2224
end:
448,930✔
2225
  api->metaReaderFn.clearReader(&metaReader);
448,930✔
2226
  STREAM_PRINT_LOG_END(code, lino);
448,930✔
2227
  if (code != 0)  {
448,930✔
2228
    taosArrayDestroy(*schemas);
3,750✔
2229
    *schemas = NULL;
3,750✔
2230
  }
2231
  return code;
448,930✔
2232
}
2233

2234
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
445,180✔
2235
  int32_t code = 0;
445,180✔
2236
  int32_t lino = 0;
445,180✔
2237
  size_t  schemaLen = taosArrayGetSize(schemas);
445,180✔
2238
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
445,180✔
2239
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1,740,334✔
2240
    col_id_t* id = taosArrayGet(cols, i);
1,295,154✔
2241
    STREAM_CHECK_NULL_GOTO(id, terrno);
1,295,154✔
2242
    for (size_t i = 0; i < schemaLen; i++) {
3,398,948✔
2243
      SSchema* s = taosArrayGet(schemas, i);
3,398,948✔
2244
      STREAM_CHECK_NULL_GOTO(s, terrno);
3,398,948✔
2245
      if (*id == s->colId) {
3,398,948✔
2246
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
1,295,154✔
2247
        break;
1,295,154✔
2248
      }
2249
    }
2250
  }
2251
  taosArrayPopFrontBatch(schemas, schemaLen);
445,180✔
2252

2253
end:
445,180✔
2254
  return code;
445,180✔
2255
}
2256

2257
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
×
2258
                                    STargetNode* pTargetNodeTs) {
2259
  int32_t code = 0;
×
2260
  int32_t lino = 0;
×
2261

2262
  SColumnNode*         pCol = NULL;
×
UNCOV
2263
  SColumnNode*         pCol1 = NULL;
×
2264
  SValueNode*          pVal = NULL;
×
UNCOV
2265
  SValueNode*          pVal1 = NULL;
×
2266
  SOperatorNode*       op = NULL;
×
2267
  SOperatorNode*       op1 = NULL;
×
2268
  SLogicConditionNode* cond = NULL;
×
2269

2270
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
×
UNCOV
2271
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
×
2272
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
×
2273
  pCol->node.resType.bytes = LONG_BYTES;
×
2274
  pCol->slotId = pTargetNodeTs->slotId;
×
UNCOV
2275
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
×
2276

2277
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
×
2278

2279
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
×
2280
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
×
2281
  pVal->node.resType.bytes = LONG_BYTES;
×
2282
  pVal->datum.i = start;
×
2283
  pVal->typeData = start;
×
2284

2285
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
×
2286
  pVal1->datum.i = end;
×
2287
  pVal1->typeData = end;
×
2288

2289
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
×
2290
  op->opType = OP_TYPE_GREATER_EQUAL;
×
2291
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
2292
  op->node.resType.bytes = CHAR_BYTES;
×
UNCOV
2293
  op->pLeft = (SNode*)pCol;
×
2294
  op->pRight = (SNode*)pVal;
×
2295
  pCol = NULL;
×
2296
  pVal = NULL;
×
2297

2298
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
×
2299
  op1->opType = OP_TYPE_LOWER_EQUAL;
×
2300
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
2301
  op1->node.resType.bytes = CHAR_BYTES;
×
2302
  op1->pLeft = (SNode*)pCol1;
×
UNCOV
2303
  op1->pRight = (SNode*)pVal1;
×
2304
  pCol1 = NULL;
×
UNCOV
2305
  pVal1 = NULL;
×
2306

2307
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
×
2308
  cond->condType = LOGIC_COND_TYPE_AND;
×
2309
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
2310
  cond->node.resType.bytes = CHAR_BYTES;
×
2311
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
×
2312
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
×
2313
  op = NULL;
×
2314
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
×
UNCOV
2315
  op1 = NULL;
×
2316

UNCOV
2317
  *pCond = cond;
×
2318

UNCOV
2319
end:
×
UNCOV
2320
  if (code != 0) {
×
UNCOV
2321
    nodesDestroyNode((SNode*)pCol);
×
UNCOV
2322
    nodesDestroyNode((SNode*)pCol1);
×
UNCOV
2323
    nodesDestroyNode((SNode*)pVal);
×
UNCOV
2324
    nodesDestroyNode((SNode*)pVal1);
×
UNCOV
2325
    nodesDestroyNode((SNode*)op);
×
UNCOV
2326
    nodesDestroyNode((SNode*)op1);
×
UNCOV
2327
    nodesDestroyNode((SNode*)cond);
×
2328
  }
UNCOV
2329
  STREAM_PRINT_LOG_END(code, lino);
×
2330

UNCOV
2331
  return code;
×
2332
}
2333

2334
/*
2335
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
2336
  int32_t              code = 0;
2337
  int32_t              lino = 0;
2338
  SLogicConditionNode* pAndCondition = NULL;
2339
  SLogicConditionNode* cond = NULL;
2340

2341
  if (pTargetNodeTs == NULL) {
2342
    vError("stream reader %s no ts column", __func__);
2343
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
2344
  }
2345
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
2346
  cond->condType = LOGIC_COND_TYPE_OR;
2347
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
2348
  cond->node.resType.bytes = CHAR_BYTES;
2349
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
2350

2351
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
2352
    data->curIdx = i;
2353

2354
    SReadHandle handle = {0};
2355
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
2356
    if (!handle.winRangeValid) {
2357
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
2358
              handle.winRange.ekey);
2359
      continue;
2360
    }
2361
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
2362
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
2363
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
2364
    pAndCondition = NULL;
2365
  }
2366

2367
  *pCond = cond;
2368

2369
end:
2370
  if (code != 0) {
2371
    nodesDestroyNode((SNode*)pAndCondition);
2372
    nodesDestroyNode((SNode*)cond);
2373
  }
2374
  STREAM_PRINT_LOG_END(code, lino);
2375

2376
  return code;
2377
}
2378
*/
2379

2380
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
504,371✔
2381
                                    STimeRangeNode* node, SReadHandle* handle, bool isExtWin) {
2382
  int32_t code = 0;
504,371✔
2383
  int32_t lino = 0;
504,371✔
2384
  void* pTask = sStreamReaderCalcInfo->pTask;
504,371✔
2385
  STimeWindow* pWin = isExtWin ? &handle->extWinRange : &handle->winRange;
504,371✔
2386
  bool* pValid = isExtWin ? &handle->extWinRangeValid : &handle->winRangeValid;
504,371✔
2387
  
2388
  if (req->pStRtFuncInfo->withExternalWindow) {
504,371✔
2389
    sStreamReaderCalcInfo->tmpRtFuncInfo.curIdx = 0;
382,709✔
2390
    sStreamReaderCalcInfo->tmpRtFuncInfo.triggerType = req->pStRtFuncInfo->triggerType;
382,709✔
2391
    sStreamReaderCalcInfo->tmpRtFuncInfo.isWindowTrigger = req->pStRtFuncInfo->isWindowTrigger;
382,709✔
2392
    sStreamReaderCalcInfo->tmpRtFuncInfo.precision = req->pStRtFuncInfo->precision;
382,709✔
2393

2394
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
382,709✔
2395
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
382,709✔
2396
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
382,709✔
2397
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
382,709✔
2398

2399
    if (!node->needCalc) {
382,709✔
2400
      pWin->skey = pFirst->wstart;
265,499✔
2401
      pWin->ekey = pLast->wend;
265,499✔
2402
      *pValid = true;
265,499✔
2403
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
265,499✔
2404
        pWin->ekey--;
177,215✔
2405
      }
2406
    } else {
2407
      SSTriggerCalcParam* pTmp = taosArrayGet(sStreamReaderCalcInfo->tmpRtFuncInfo.pStreamPesudoFuncVals, 0);
117,210✔
2408
      memcpy(pTmp, pFirst, sizeof(*pTmp));
117,210✔
2409

2410
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 1));
117,210✔
2411
      if (*pValid) {
117,210✔
2412
        int64_t skey = pWin->skey;
117,210✔
2413

2414
        memcpy(pTmp, pLast, sizeof(*pTmp));
117,210✔
2415
        STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 2));
117,210✔
2416

2417
        if (*pValid) {
117,210✔
2418
          pWin->skey = skey;
117,210✔
2419
        }
2420
      }
2421
      pWin->ekey--;
117,210✔
2422
    }
2423
  } else {
2424
    if (!node->needCalc) {
121,662✔
2425
      SSTriggerCalcParam* pCurr = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, req->pStRtFuncInfo->curIdx);
79,299✔
2426
      pWin->skey = pCurr->wstart;
79,299✔
2427
      pWin->ekey = pCurr->wend;
79,299✔
2428
      *pValid = true;
79,299✔
2429
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
79,299✔
2430
        pWin->ekey--;
32,410✔
2431
      }
2432
    } else {
2433
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, req->pStRtFuncInfo, pWin, pValid, 3));
42,363✔
2434
      pWin->ekey--;
42,363✔
2435
    }
2436
  }
2437

2438
  ST_TASK_DLOG("%s type:%s, withExternalWindow:%d, skey:%" PRId64 ", ekey:%" PRId64 ", validRange:%d", 
504,371✔
2439
      __func__, isExtWin ? "interp range" : "scan time range", req->pStRtFuncInfo->withExternalWindow, pWin->skey, pWin->ekey, *pValid);
2440

2441
end:
5,850✔
2442

2443
  if (code) {
504,371✔
UNCOV
2444
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2445
  }
2446
  
2447
  return code;
504,371✔
2448
}
2449

2450
static int32_t createDataBlockTsUid(SSDataBlock** pBlockRet, uint32_t numOfRows) {
442,039✔
2451
  int32_t      code = 0;
442,039✔
2452
  int32_t      lino = 0;
442,039✔
2453
  SSDataBlock* pBlock = NULL;
442,039✔
2454
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
442,476✔
2455
  SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID);
442,476✔
2456
  STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
442,476✔
2457
  idata = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID + 1);
442,476✔
2458
  STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
442,476✔
2459
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
442,476✔
2460

2461
end:
442,476✔
2462
  STREAM_PRINT_LOG_END(code, lino)
442,476✔
2463
  if (code != TSDB_CODE_SUCCESS) {
442,476✔
UNCOV
2464
    blockDataDestroy(pBlock);
×
UNCOV
2465
    pBlock = NULL;
×
2466
  }
2467
  *pBlockRet = pBlock;
442,476✔
2468
  return code;
442,476✔
2469
}
2470

2471
static int32_t processTsOutPutAllTables(SStreamTriggerReaderInfo* sStreamReaderInfo, SStreamTsResponse* tsRsp, SSDataBlock* pResBlock, int32_t order) {
274,508✔
2472
  int32_t code = 0;
274,508✔
2473
  int32_t lino = 0;
274,508✔
2474
  void* pTask = sStreamReaderInfo->pTask;
274,508✔
2475

2476
  tsRsp->tsInfo = taosArrayInit(pResBlock->info.rows, sizeof(STsInfo));
274,508✔
2477
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
274,875✔
2478
  SColumnInfoData* pColInfoDataTs = taosArrayGet(pResBlock->pDataBlock, 0);
274,875✔
2479
  SColumnInfoData* pColInfoDataUid = taosArrayGet(pResBlock->pDataBlock, 1);
274,875✔
2480
  for (int32_t j = 0; j < pResBlock->info.rows; j++) {
771,927✔
2481
    if (colDataIsNull_s(pColInfoDataTs, j) || pColInfoDataTs->pData == NULL) {
994,349✔
UNCOV
2482
      continue;
×
2483
    }
2484
    STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
497,052✔
2485
    STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
497,119✔
2486
    if (order == TSDB_ORDER_ASC) {
497,119✔
2487
      tsInfo->ts = INT64_MAX;
258,359✔
2488
    } else {
2489
      tsInfo->ts = INT64_MIN;
238,760✔
2490
    }
2491
    int64_t ts = *(int64_t*)colDataGetNumData(pColInfoDataTs, j);
497,119✔
2492
    if (order == TSDB_ORDER_ASC && ts < tsInfo->ts) {
497,119✔
2493
      tsInfo->ts = ts;
258,114✔
2494
    } else if (order == TSDB_ORDER_DESC && ts > tsInfo->ts) {
239,005✔
2495
      tsInfo->ts = ts;
238,938✔
2496
    }
2497
    tsInfo->gId = *(int64_t*)colDataGetNumData(pColInfoDataUid, j);
497,186✔
2498
    ST_TASK_DLOG("%s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, __func__, tsInfo->ts, tsInfo->gId, tsRsp->ver);
497,119✔
2499
  }
2500

2501
end:
274,875✔
2502
  return code;
274,875✔
2503
}
2504

2505
static int32_t processTsOutPutOneGroup(SStreamTriggerReaderInfo* sStreamReaderInfo, SStreamTsResponse* tsRsp, SSDataBlock* pResBlock, int32_t order) {
77,347✔
2506
  int32_t code = 0;
77,347✔
2507
  int32_t lino = 0;
77,347✔
2508
  void* pTask = sStreamReaderInfo->pTask;
77,347✔
2509

2510
  tsRsp->tsInfo = taosArrayInit(1, sizeof(STsInfo));
77,347✔
2511
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
77,347✔
2512
  STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
77,120✔
2513
  STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
77,347✔
2514
  if (order == TSDB_ORDER_ASC) {
77,347✔
2515
    tsInfo->ts = INT64_MAX;
61,900✔
2516
  } else {
2517
    tsInfo->ts = INT64_MIN;
15,447✔
2518
  }
2519

2520
  SColumnInfoData* pColInfoDataTs = taosArrayGet(pResBlock->pDataBlock, 0);
77,347✔
2521
  SColumnInfoData* pColInfoDataUid = taosArrayGet(pResBlock->pDataBlock, 1);
77,347✔
2522
  for (int32_t j = 0; j < pResBlock->info.rows; j++) {
169,737✔
2523
    if (colDataIsNull_s(pColInfoDataTs, j) || pColInfoDataTs->pData == NULL) {
185,714✔
UNCOV
2524
      continue;
×
2525
    }
2526
    int64_t ts = *(int64_t*)colDataGetNumData(pColInfoDataTs, j);
92,857✔
2527
    if (order == TSDB_ORDER_ASC && ts < tsInfo->ts) {
92,432✔
2528
      tsInfo->ts = ts;
61,900✔
2529
    } else if (order == TSDB_ORDER_DESC && ts > tsInfo->ts) {
30,772✔
2530
      tsInfo->ts = ts;
15,447✔
2531
    }
2532
  }
2533
  int64_t uid = *(int64_t*)colDataGetNumData(pColInfoDataUid, 0);
76,880✔
2534
  tsInfo->gId = qStreamGetGroupIdFromSet(sStreamReaderInfo, uid);
77,120✔
2535
  ST_TASK_DLOG("%s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, __func__, tsInfo->ts, tsInfo->gId, tsRsp->ver);
77,347✔
2536

2537
end:
29,096✔
2538
  return code;
77,347✔
2539
}
2540

2541
static int32_t processTsOutPutAllGroups(SStreamTriggerReaderInfo* sStreamReaderInfo, SStreamTsResponse* tsRsp, SSDataBlock* pResBlock, int32_t order) {
7,512✔
2542
  int32_t code = 0;
7,512✔
2543
  int32_t lino = 0;
7,512✔
2544
  STableKeyInfo* pList = NULL;
7,512✔
2545
  StreamTableListInfo     tableInfo = {0};
7,512✔
2546

2547
  void* pTask = sStreamReaderInfo->pTask;
7,512✔
2548
  STREAM_CHECK_RET_GOTO(qStreamCopyTableInfo(sStreamReaderInfo, &tableInfo));
7,512✔
2549

2550
  SSHashObj*   uidTsHash = tSimpleHashInit(pResBlock->info.rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
7,512✔
2551
  STREAM_CHECK_NULL_GOTO(uidTsHash, terrno);
7,512✔
2552
  SColumnInfoData* pColInfoDataTs = taosArrayGet(pResBlock->pDataBlock, 0);
7,512✔
2553
  SColumnInfoData* pColInfoDataUid = taosArrayGet(pResBlock->pDataBlock, 1);
7,512✔
2554
  for (int32_t j = 0; j < pResBlock->info.rows; j++) {
23,340✔
2555
    if (colDataIsNull_s(pColInfoDataTs, j) || pColInfoDataTs->pData == NULL) {
31,656✔
UNCOV
2556
      continue;
×
2557
    }
2558
    int64_t ts = *(int64_t*)colDataGetNumData(pColInfoDataTs, j);
15,828✔
2559
    int64_t uid = *(int64_t*)colDataGetNumData(pColInfoDataUid, j);
15,828✔
2560
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(uidTsHash, &uid, LONG_BYTES, &ts, LONG_BYTES));
15,828✔
2561
  }
2562
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(sStreamReaderInfo), sizeof(STsInfo));
7,512✔
2563
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
7,512✔
2564
  while (true) {
15,056✔
2565
    int32_t        pNum = 0;
22,568✔
2566
    int64_t        suid = 0;
22,568✔
2567
    STREAM_CHECK_RET_GOTO(qStreamIterTableList(&tableInfo, &pList, &pNum, &suid));
22,568✔
2568
    if(pNum == 0) break;
22,568✔
2569
    STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
15,633✔
2570
    STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
15,633✔
2571
    if (order == TSDB_ORDER_ASC) {
15,633✔
2572
      tsInfo->ts = INT64_MAX;
7,341✔
2573
    } else {
2574
      tsInfo->ts = INT64_MIN;
8,292✔
2575
    }
2576
    for (int32_t i = 0; i < pNum; i++) {
31,059✔
2577
      int64_t uid = pList[i].uid;
16,003✔
2578
      int64_t *ts = tSimpleHashGet(uidTsHash, &uid, LONG_BYTES);
16,003✔
2579
      STREAM_CHECK_NULL_GOTO(ts, terrno);
16,003✔
2580
      if (order == TSDB_ORDER_ASC && *ts < tsInfo->ts) {
15,426✔
2581
        tsInfo->ts = *ts;
7,341✔
2582
      } else if (order == TSDB_ORDER_DESC && *ts > tsInfo->ts) {
8,085✔
2583
        tsInfo->ts = *ts;
7,715✔
2584
      }
2585
    }
2586
    int64_t uid = pList[0].uid;
15,056✔
2587
    tsInfo->gId = qStreamGetGroupIdFromSet(sStreamReaderInfo, uid);
15,056✔
2588
    ST_TASK_DLOG("%s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, __func__, tsInfo->ts, tsInfo->gId, tsRsp->ver);
15,056✔
2589
    taosMemoryFreeClear(pList);
15,056✔
2590
  }
2591

2592
end:
7,512✔
2593
  qStreamDestroyTableInfo(&tableInfo);
7,512✔
2594
  taosMemoryFreeClear(pList);
7,512✔
2595
  tSimpleHashCleanup(uidTsHash);
7,512✔
2596
  return code;
7,512✔
2597
}
2598

2599
// static bool stReaderTaskWaitQuit(SStreamTask* pTask) { return taosHasRWWFlag(&pTask->entryLock); }
2600

2601
static int32_t getAllTs(SVnode* pVnode, SSDataBlock*  pResBlock, SStreamReaderTaskInner* pTaskInner, STableKeyInfo* pList, int32_t pNum) {
336,837✔
2602
  int32_t code = 0;
336,837✔
2603
  int32_t lino = 0;
336,837✔
2604

2605
  STREAM_CHECK_RET_GOTO(pTaskInner->storageApi->tsdReader.tsdCreateFirstLastTsIter(pVnode, &pTaskInner->options->twindows, &(SVersionRange){.minVer = -1, .maxVer = pTaskInner->options->ver},
336,837✔
2606
                                                pTaskInner->options->suid, pList, pNum, pTaskInner->options->order, &pTaskInner->pReader, pTaskInner->idStr));
2607
  bool hasNext = true;
336,837✔
2608
  while(1){
2609
    STREAM_CHECK_RET_GOTO(pTaskInner->storageApi->tsdReader.tsdNextFirstLastTsBlock(pTaskInner->pReader, pResBlock, &hasNext));
336,837✔
2610
    STREAM_CHECK_CONDITION_GOTO(!hasNext, TDB_CODE_SUCCESS);
336,597✔
2611
  }
2612

2613
end:
336,597✔
2614
  pTaskInner->storageApi->tsdReader.tsdDestroyFirstLastTsIter(pTaskInner->pReader);
336,597✔
2615
  pTaskInner->pReader = NULL;
335,370✔
2616
  return code;
335,781✔
2617
}
2618

2619
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
227,747✔
2620
                                  SStreamReaderTaskInner* pTaskInner) {
2621
  int32_t code = 0;
227,747✔
2622
  int32_t lino = 0;
227,747✔
2623
  STableKeyInfo* pList = NULL;
227,747✔
2624
  StreamTableListInfo     tableInfo = {0};
227,747✔
2625

2626
  void* pTask = sStreamReaderInfo->pTask;
227,747✔
2627
  STREAM_CHECK_RET_GOTO(qStreamCopyTableInfo(sStreamReaderInfo, &tableInfo));
227,747✔
2628

2629
  SSDataBlock*  pResBlock = NULL;
227,747✔
2630
  STREAM_CHECK_RET_GOTO(createDataBlockTsUid(&pResBlock, qStreamGetTableListNum(sStreamReaderInfo)));
227,747✔
2631

2632
  while (true) {
122,108✔
2633
    int32_t        pNum = 0;
349,855✔
2634
    int64_t        suid = 0;
349,855✔
2635
    STREAM_CHECK_RET_GOTO(qStreamIterTableList(&tableInfo, &pList, &pNum, &suid));
349,855✔
2636
    if(pNum == 0) break;
349,855✔
2637
    pTaskInner->options->suid = suid;
122,108✔
2638
    STREAM_CHECK_RET_GOTO(getAllTs(pVnode, pResBlock, pTaskInner, pList, pNum));
122,108✔
2639
    taosMemoryFreeClear(pList);
122,108✔
2640
  }
2641

2642
  STREAM_CHECK_RET_GOTO(processTsOutPutAllTables(sStreamReaderInfo, tsRsp, pResBlock, pTaskInner->options->order));
227,747✔
2643

2644
end:
227,747✔
2645
  qStreamDestroyTableInfo(&tableInfo);
227,747✔
2646
  taosMemoryFreeClear(pList);
227,747✔
2647
  blockDataDestroy(pResBlock);
227,747✔
2648
  STREAM_PRINT_LOG_END_WITHID(code, lino);
227,747✔
2649
  return code;
227,747✔
2650
}
2651

2652
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
206,950✔
2653
                                  SStreamReaderTaskInner* pTaskInner) {
2654
  int32_t code = 0;
206,950✔
2655
  int32_t lino = 0;
206,950✔
2656
  STableKeyInfo* pList = NULL;
206,950✔
2657
  void* pTask = sStreamReaderInfo->pTask;
206,950✔
2658
  
2659
  SSDataBlock*  pResBlock = NULL;
206,950✔
2660

2661
  int32_t        pNum = 0;
206,950✔
2662
  int64_t        suid = 0;
206,950✔
2663
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, 0, &pList, &pNum));
206,950✔
2664
  STREAM_CHECK_CONDITION_GOTO(pNum == 0, TSDB_CODE_SUCCESS);
206,950✔
2665
  STREAM_CHECK_RET_GOTO(createDataBlockTsUid(&pResBlock, pNum));
175,979✔
2666

2667
  pTaskInner->options->suid = sStreamReaderInfo->suid;
175,979✔
2668
  STREAM_CHECK_RET_GOTO(getAllTs(pVnode, pResBlock, pTaskInner, pList, pNum));
175,979✔
2669
  STREAM_CHECK_CONDITION_GOTO(pResBlock->info.rows == 0, TDB_CODE_SUCCESS);
175,612✔
2670
  int32_t order = pTaskInner->options->order;
93,237✔
2671

2672
  if (sStreamReaderInfo->groupByTbname) {
92,870✔
2673
    STREAM_CHECK_RET_GOTO(processTsOutPutAllTables(sStreamReaderInfo, tsRsp, pResBlock, order));
46,761✔
2674
  } else if (sStreamReaderInfo->partitionCols == NULL) {
46,109✔
2675
    STREAM_CHECK_RET_GOTO(processTsOutPutOneGroup(sStreamReaderInfo, tsRsp, pResBlock, order));
38,597✔
2676
  } else {
2677
    STREAM_CHECK_RET_GOTO(processTsOutPutAllGroups(sStreamReaderInfo, tsRsp, pResBlock, order));
7,512✔
2678
  }                             
2679
end:
206,501✔
2680
  blockDataDestroy(pResBlock);
206,740✔
2681
  taosMemoryFreeClear(pList);
206,542✔
2682
  STREAM_PRINT_LOG_END_WITHID(code, lino);
206,740✔
2683
  return code;
206,740✔
2684
}
2685

2686
static int32_t processTsOnce(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
50,759✔
2687
                                  SStreamReaderTaskInner* pTaskInner, uint64_t gid) {
2688
  int32_t code = 0;
50,759✔
2689
  int32_t lino = 0;
50,759✔
2690
  STableKeyInfo* pList = NULL;
50,759✔
2691
  void* pTask = sStreamReaderInfo->pTask;
50,759✔
2692
  
2693
  SSDataBlock*  pResBlock = NULL;
50,759✔
2694

2695
  int32_t        pNum = 0;
50,759✔
2696
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, gid, &pList, &pNum));
50,759✔
2697
  STREAM_CHECK_CONDITION_GOTO(pNum == 0, TSDB_CODE_SUCCESS);
50,759✔
2698
  STREAM_CHECK_RET_GOTO(createDataBlockTsUid(&pResBlock, pNum));
38,750✔
2699

2700
  pTaskInner->options->suid = sStreamReaderInfo->suid;
38,750✔
2701
  STREAM_CHECK_RET_GOTO(getAllTs(pVnode, pResBlock, pTaskInner, pList, pNum));
38,750✔
2702
  STREAM_CHECK_CONDITION_GOTO(pResBlock->info.rows == 0, TDB_CODE_SUCCESS);
38,750✔
2703
  int32_t order = pTaskInner->options->order;
38,750✔
2704

2705
  STREAM_CHECK_RET_GOTO(processTsOutPutOneGroup(sStreamReaderInfo, tsRsp, pResBlock, order));
38,510✔
2706
end:
50,759✔
2707
  blockDataDestroy(pResBlock);
50,759✔
2708
  taosMemoryFreeClear(pList);
50,305✔
2709
  STREAM_PRINT_LOG_END_WITHID(code, lino);
50,532✔
2710
  return code;
50,532✔
2711
}
2712

2713
static int32_t processTs(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
434,487✔
2714
                                  SStreamReaderTaskInner* pTaskInner) {
2715
  if (sStreamReaderInfo->isVtableStream) {
434,487✔
2716
    return processTsVTable(pVnode, tsRsp, sStreamReaderInfo, pTaskInner);
227,747✔
2717
  }
2718

2719
  return processTsNonVTable(pVnode, tsRsp, sStreamReaderInfo, pTaskInner);
206,950✔
2720
}
2721

2722
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
128,361✔
2723
  int32_t code = 0;
128,361✔
2724
  int32_t lino = 0;
128,361✔
2725
  void*   buf = NULL;
128,361✔
2726
  size_t  size = 0;
128,361✔
2727
  void* pTask = sStreamReaderInfo->pTask;
128,361✔
2728

2729
  ST_TASK_DLOG("vgId:%d %s start, trigger hash size:%d, calc hash size:%d", TD_VID(pVnode), __func__,
128,361✔
2730
                tSimpleHashGetSize(req->setTableReq.uidInfoTrigger), tSimpleHashGetSize(req->setTableReq.uidInfoCalc));
2731

2732
  taosWLockLatch(&sStreamReaderInfo->lock);
128,361✔
2733
  TSWAP(sStreamReaderInfo->uidHashTrigger, req->setTableReq.uidInfoTrigger);
128,361✔
2734
  TSWAP(sStreamReaderInfo->uidHashCalc, req->setTableReq.uidInfoCalc);
128,361✔
2735
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashTrigger, TSDB_CODE_INVALID_PARA);
128,361✔
2736
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashCalc, TSDB_CODE_INVALID_PARA);
128,361✔
2737

2738
  STREAM_CHECK_RET_GOTO(initStreamTableListInfo(&sStreamReaderInfo->vSetTableList));
128,361✔
2739
  STREAM_CHECK_RET_GOTO(qBuildVTableList(sStreamReaderInfo));
128,361✔
2740
end:
128,361✔
2741
  taosWUnLockLatch(&sStreamReaderInfo->lock);
128,361✔
2742
  STREAM_PRINT_LOG_END_WITHID(code, lino);
128,361✔
2743
  SRpcMsg rsp = {
128,361✔
2744
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2745
  tmsgSendRsp(&rsp);
128,361✔
2746
  return code;
128,361✔
2747
}
2748

2749
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
268,398✔
2750
  int32_t                 code = 0;
268,398✔
2751
  int32_t                 lino = 0;
268,398✔
2752
  SStreamReaderTaskInner* pTaskInner = NULL;
268,398✔
2753
  SStreamTsResponse       tsRsp = {0};
268,608✔
2754
  void*                   buf = NULL;
268,608✔
2755
  size_t                  size = 0;
268,608✔
2756

2757
  void* pTask = sStreamReaderInfo->pTask;
268,608✔
2758

2759
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
268,608✔
2760

2761
  BUILD_OPTION(options, 0, sStreamReaderInfo->tableList.version, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, NULL, false, NULL);
268,608✔
2762
  STREAM_CHECK_RET_GOTO(createStreamTaskForTs(&options, &pTaskInner, &sStreamReaderInfo->storageApi));
268,608✔
2763

2764
  tsRsp.ver = sStreamReaderInfo->tableList.version + 1;
268,608✔
2765

2766
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &tsRsp, sStreamReaderInfo, pTaskInner));
268,608✔
2767
  
2768
end:
268,608✔
2769
  ST_TASK_DLOG("vgId:%d %s get result size:%"PRIzu", ver:%"PRId64, TD_VID(pVnode), __func__, taosArrayGetSize(tsRsp.tsInfo), tsRsp.ver);
268,608✔
2770
  code = buildTsRsp(&tsRsp, &buf, &size);
268,608✔
2771
  STREAM_PRINT_LOG_END_WITHID(code, lino);
268,608✔
2772
  SRpcMsg rsp = {
268,975✔
2773
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2774
  tmsgSendRsp(&rsp);
268,608✔
2775
  taosArrayDestroy(tsRsp.tsInfo);
268,608✔
2776
  taosMemoryFree(pTaskInner);
268,608✔
2777
  return code;
268,608✔
2778
}
2779

2780
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
216,848✔
2781
  int32_t                 code = 0;
216,848✔
2782
  int32_t                 lino = 0;
216,848✔
2783
  SStreamReaderTaskInner* pTaskInner = NULL;
216,848✔
2784
  SStreamTsResponse       tsRsp = {0};
216,848✔
2785
  void*                   buf = NULL;
216,848✔
2786
  size_t                  size = 0;
216,848✔
2787

2788
  void* pTask = sStreamReaderInfo->pTask;
216,848✔
2789
  ST_TASK_DLOG("vgId:%d %s start, startTime:%"PRId64" ver:%"PRId64" gid:%"PRId64, TD_VID(pVnode), __func__, req->firstTsReq.startTime, req->firstTsReq.ver, req->firstTsReq.gid);
216,848✔
2790
  int32_t        pNum = 0;
216,848✔
2791

2792
  tsRsp.ver = pVnode->state.applied;
216,848✔
2793

2794
  BUILD_OPTION(options, 0, req->firstTsReq.ver, TSDB_ORDER_ASC, req->firstTsReq.startTime, INT64_MAX, NULL, false, NULL);
216,848✔
2795
  STREAM_CHECK_RET_GOTO(createStreamTaskForTs(&options, &pTaskInner, &sStreamReaderInfo->storageApi));
216,848✔
2796

2797
  if (req->firstTsReq.gid != 0) {
216,848✔
2798
    STREAM_CHECK_RET_GOTO(processTsOnce(pVnode, &tsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.gid));
50,759✔
2799
  } else {
2800
    STREAM_CHECK_RET_GOTO(processTs(pVnode, &tsRsp, sStreamReaderInfo, pTaskInner));
166,089✔
2801
  }
2802

2803
end:
216,848✔
2804
  ST_TASK_DLOG("vgId:%d %s get result size:%"PRIzu", ver:%"PRId64, TD_VID(pVnode), __func__, taosArrayGetSize(tsRsp.tsInfo), tsRsp.ver);
216,848✔
2805
  code = buildTsRsp(&tsRsp, &buf, &size);
216,848✔
2806
  STREAM_PRINT_LOG_END_WITHID(code, lino);
216,621✔
2807
  SRpcMsg rsp = {
216,621✔
2808
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2809
  tmsgSendRsp(&rsp);
216,848✔
2810
  taosArrayDestroy(tsRsp.tsInfo);
216,848✔
2811
  taosMemoryFree(pTaskInner);
216,848✔
2812
  return code;
216,848✔
2813
}
2814

2815
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
414,519✔
2816
  int32_t code = 0;
414,519✔
2817
  int32_t lino = 0;
414,519✔
2818
  void*   buf = NULL;
414,519✔
2819
  size_t  size = 0;
416,769✔
2820
  STableKeyInfo* pList = NULL;
416,769✔
2821

2822
  void* pTask = sStreamReaderInfo->pTask;
416,769✔
2823
  ST_TASK_DLOG("vgId:%d %s start, ver:%" PRId64 ",skey:%" PRId64 ",ekey:%" PRId64 ",gid:%" PRId64, TD_VID(pVnode),
416,769✔
2824
               __func__, req->tsdbMetaReq.ver, req->tsdbMetaReq.startTime, req->tsdbMetaReq.endTime,
2825
               req->tsdbMetaReq.gid);
2826

2827
  SStreamReaderTaskInner* pTaskInner = NULL;
416,769✔
2828
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
416,769✔
2829

2830
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
416,769✔
2831
    int32_t        pNum = 0;
416,769✔
2832
    STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbMetaReq.gid, &pList, &pNum));
416,769✔
2833
    BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbMetaReq.ver, req->tsdbMetaReq.order, req->tsdbMetaReq.startTime, req->tsdbMetaReq.endTime, 
416,769✔
2834
                          sStreamReaderInfo->tsSchemas, true, NULL);
2835
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, pList, pNum, &sStreamReaderInfo->storageApi));
416,769✔
2836
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
416,769✔
2837
    
2838
    STREAM_CHECK_RET_GOTO(createBlockForTsdbMeta(&pTaskInner->pResBlockDst, sStreamReaderInfo->isVtableStream));
416,769✔
2839
  } else {
UNCOV
2840
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
UNCOV
2841
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
UNCOV
2842
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
UNCOV
2843
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2844
  }
2845

2846
  blockDataCleanup(pTaskInner->pResBlockDst);
416,769✔
2847
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
416,769✔
2848
  bool hasNext = true;
416,769✔
2849
  while (true) {
315,307✔
2850
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
732,076✔
2851
    if (!hasNext) {
732,076✔
2852
      break;
416,769✔
2853
    }
2854
    pTaskInner->storageApi->tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
315,307✔
2855
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
315,307✔
2856

2857
    int32_t index = 0;
315,307✔
2858
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
315,307✔
2859
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
315,307✔
2860
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
315,307✔
2861
    if (!sStreamReaderInfo->isVtableStream) {
315,307✔
2862
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
106,078✔
2863
    }
2864
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
315,493✔
2865

2866
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
315,307✔
2867
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2868
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2869
            pTaskInner->pResBlockDst->info.rows++;
315,493✔
2870
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
315,307✔
UNCOV
2871
      break;
×
2872
    }
2873
  }
2874

2875
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
416,769✔
2876
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
416,769✔
2877
  printDataBlock(pTaskInner->pResBlockDst, __func__, "meta", ((SStreamTask *)sStreamReaderInfo->pTask)->streamId);
416,769✔
2878
  if (!hasNext) {
416,769✔
2879
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
416,769✔
2880
  }
2881

2882
end:
416,583✔
2883
  STREAM_PRINT_LOG_END_WITHID(code, lino);
416,583✔
2884
  SRpcMsg rsp = {
416,769✔
2885
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2886
  tmsgSendRsp(&rsp);
416,769✔
2887
  taosMemoryFree(pList);
416,769✔
2888
  return code;
416,769✔
2889
}
2890

2891
static int32_t vnodeProcessStreamTsdbTsDataReqNonVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
100,143✔
2892
  int32_t                 code = 0;
100,143✔
2893
  int32_t                 lino = 0;
100,143✔
2894
  SStreamReaderTaskInner* pTaskInner = NULL;
100,143✔
2895
  void*                   buf = NULL;
100,143✔
2896
  size_t                  size = 0;
100,143✔
2897
  SSDataBlock*            pBlockRes = NULL;
100,143✔
2898

2899
  void* pTask = sStreamReaderInfo->pTask;
100,143✔
2900
  ST_TASK_DLOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
100,143✔
2901
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2902
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2903

2904
  int32_t        pNum = 1;
100,143✔
2905
  STableKeyInfo  pList = {.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, req->tsdbTsDataReq.uid), .uid = req->tsdbTsDataReq.uid};
100,143✔
2906
  STREAM_CHECK_CONDITION_GOTO(pList.groupId == -1, TSDB_CODE_INVALID_PARA);
100,143✔
2907
  BUILD_OPTION(options, getSuid(sStreamReaderInfo, &pList), req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
100,143✔
2908
               sStreamReaderInfo->triggerCols, false, NULL);
2909
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &pList, pNum, &sStreamReaderInfo->storageApi));
100,143✔
2910
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
100,143✔
2911
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
100,143✔
2912

2913
  while (1) {
100,143✔
2914
    bool hasNext = false;
200,286✔
2915
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
200,286✔
2916
    if (!hasNext) {
200,286✔
2917
      break;
100,143✔
2918
    }
2919
    // if (!sStreamReaderInfo->isVtableStream){
2920
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
100,143✔
2921
    // }
2922

2923
    SSDataBlock* pBlock = NULL;
100,143✔
2924
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
100,143✔
2925
    if (pBlock != NULL && pBlock->info.rows > 0) {
100,143✔
2926
      STREAM_CHECK_RET_GOTO(processTag(sStreamReaderInfo, false, pBlock->info.id.uid, pBlock,
100,143✔
2927
          0, pBlock->info.rows, 1));
2928
    }
2929
    
2930
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderInfo->pFilterInfo, NULL));
100,143✔
2931
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
100,143✔
2932
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
100,143✔
2933
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2934
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2935
  }
2936

2937
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
100,143✔
2938

2939
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
100,143✔
2940
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
100,143✔
2941

2942
end:
100,143✔
2943
  STREAM_PRINT_LOG_END_WITHID(code, lino);
100,143✔
2944
  SRpcMsg rsp = {
100,143✔
2945
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2946
  tmsgSendRsp(&rsp);
100,143✔
2947
  blockDataDestroy(pBlockRes);
100,143✔
2948

2949
  releaseStreamTask(&pTaskInner);
100,143✔
2950
  return code;
100,143✔
2951
}
2952

UNCOV
2953
static int32_t vnodeProcessStreamTsdbTsDataReqVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
×
UNCOV
2954
  int32_t                 code = 0;
×
UNCOV
2955
  int32_t                 lino = 0;
×
UNCOV
2956
  SStreamReaderTaskInner* pTaskInner = NULL;
×
UNCOV
2957
  void*                   buf = NULL;
×
UNCOV
2958
  size_t                  size = 0;
×
UNCOV
2959
  SSDataBlock*            pBlockRes = NULL;
×
2960

UNCOV
2961
  void* pTask = sStreamReaderInfo->pTask;
×
UNCOV
2962
  ST_TASK_DLOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
×
2963
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2964
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2965

UNCOV
2966
  int32_t        pNum = 1;
×
UNCOV
2967
  STableKeyInfo  pList = {.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, req->tsdbTsDataReq.uid), .uid = req->tsdbTsDataReq.uid};
×
UNCOV
2968
  STREAM_CHECK_CONDITION_GOTO(pList.groupId == -1, TSDB_CODE_INVALID_PARA);
×
UNCOV
2969
  BUILD_OPTION(options, getSuid(sStreamReaderInfo, &pList), req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
×
2970
               sStreamReaderInfo->tsSchemas, true, NULL);
UNCOV
2971
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->tsBlock, &pList, pNum, &sStreamReaderInfo->storageApi));
×
UNCOV
2972
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
×
2973

UNCOV
2974
  while (1) {
×
UNCOV
2975
    bool hasNext = false;
×
UNCOV
2976
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
×
UNCOV
2977
    if (!hasNext) {
×
UNCOV
2978
      break;
×
2979
    }
2980

UNCOV
2981
    SSDataBlock* pBlock = NULL;
×
UNCOV
2982
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
×
UNCOV
2983
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRes, pBlock));
×
UNCOV
2984
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
×
2985
            TD_VID(pVnode), __func__, pBlockRes->info.window.skey, pBlockRes->info.window.ekey,
2986
            pBlockRes->info.id.uid, pBlockRes->info.id.groupId, pBlockRes->info.rows);
2987
  }
2988

UNCOV
2989
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
×
UNCOV
2990
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
×
2991

UNCOV
2992
end:
×
UNCOV
2993
  STREAM_PRINT_LOG_END_WITHID(code, lino);
×
UNCOV
2994
  SRpcMsg rsp = {
×
2995
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
UNCOV
2996
  tmsgSendRsp(&rsp);
×
UNCOV
2997
  blockDataDestroy(pBlockRes);
×
2998

UNCOV
2999
  releaseStreamTask(&pTaskInner);
×
UNCOV
3000
  return code;
×
3001
}
3002

3003
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
108,222✔
3004
  int32_t code = 0;
108,222✔
3005
  int32_t lino = 0;
108,222✔
3006
  void*   buf = NULL;
108,222✔
3007
  size_t  size = 0;
108,222✔
3008
  STableKeyInfo* pList = NULL;
108,222✔
3009
  SArray*        pResList = NULL;
108,222✔
3010
  SSDataBlock*   pBlockTmp = NULL;
108,222✔
3011

3012
  SStreamReaderTaskInner* pTaskInner = NULL;
108,222✔
3013
  void* pTask = sStreamReaderInfo->pTask;
108,222✔
3014
  ST_TASK_DLOG("vgId:%d %s start. ver:%"PRId64",order:%d,startTs:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, req->tsdbTriggerDataReq.gid);
108,222✔
3015
  
3016
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
108,222✔
3017

3018
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
108,222✔
3019
    int32_t        pNum = 0;
54,111✔
3020
    STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbTriggerDataReq.gid, &pList, &pNum));
54,111✔
3021
    BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
54,111✔
3022
                 sStreamReaderInfo->triggerCols, false, NULL);
3023
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, pList, pNum, &sStreamReaderInfo->storageApi));
54,111✔
3024
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
54,111✔
3025
  } else {
3026
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
54,111✔
3027
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
54,111✔
UNCOV
3028
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
UNCOV
3029
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
3030
  }
3031

3032
  blockDataCleanup(pTaskInner->pResBlockDst);
54,111✔
3033
  bool hasNext = true;
54,111✔
3034
  int32_t totalRows = 0;
54,111✔
3035
    
3036
  pResList = taosArrayInit(4, POINTER_BYTES);
54,111✔
3037
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
54,111✔
3038
  while (1) {
59,140✔
3039
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
113,251✔
3040
    if (!hasNext) {
113,251✔
3041
      break;
54,111✔
3042
    }
3043
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
59,140✔
3044
    // pTaskInner->pResBlockDst->info.id.groupId = pTaskInner->pResBlock->info.id.groupId;
3045

3046
    SSDataBlock* pBlock = NULL;
59,140✔
3047
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
59,140✔
3048
    if (pBlock != NULL && pBlock->info.rows > 0) {
59,140✔
3049
      STREAM_CHECK_RET_GOTO(
59,140✔
3050
        processTag(sStreamReaderInfo, false, pBlock->info.id.uid, pBlock, 0, pBlock->info.rows, 1));
3051
    }
3052
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderInfo->pFilterInfo, NULL));
59,140✔
3053
    // STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
3054
    ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
59,140✔
3055
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pBlock, true, &pBlockTmp));
59,140✔
3056
    STREAM_CHECK_NULL_GOTO(taosArrayPush(pResList, &pBlockTmp), terrno);
59,140✔
3057
    totalRows += blockDataGetNumOfRows(pBlockTmp);
59,140✔
3058
    pBlockTmp = NULL;
59,140✔
3059

3060
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
59,140✔
3061
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
3062
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
3063
    if (totalRows >= STREAM_RETURN_ROWS_NUM) {  //todo optimize send multi blocks in one group
59,140✔
UNCOV
3064
      break;
×
3065
    }
3066
  }
3067

3068
  STREAM_CHECK_RET_GOTO(buildArrayRsp(pResList, &buf, &size));
54,111✔
3069
  if (!hasNext) {
53,889✔
3070
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
53,889✔
3071
  }
3072

3073
end:
108,222✔
3074
  STREAM_PRINT_LOG_END_WITHID(code, lino);
108,222✔
3075
  SRpcMsg rsp = {
108,222✔
3076
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3077
  tmsgSendRsp(&rsp);
108,000✔
3078
  taosMemoryFree(pList);
108,222✔
3079
  blockDataDestroy(pBlockTmp);
108,222✔
3080
  taosArrayDestroyP(pResList, (FDelete)blockDataDestroy);
108,222✔
3081
  return code;
108,222✔
3082
}
3083

3084
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
6,149,951✔
3085
  int32_t code = 0;
6,149,951✔
3086
  int32_t lino = 0;
6,149,951✔
3087
  void*   buf = NULL;
6,149,951✔
3088
  size_t  size = 0;
6,150,178✔
3089
  SSDataBlock*   pBlockRes = NULL;
6,150,379✔
3090
  STableKeyInfo* pList = NULL;
6,150,379✔
3091

3092

3093
  void* pTask = sStreamReaderInfo->pTask;
6,150,379✔
3094
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64",ver:%"PRId64, TD_VID(pVnode), __func__, 
6,150,379✔
3095
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid, req->tsdbCalcDataReq.ver);
3096

3097
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
6,150,379✔
3098

3099
  SStreamReaderTaskInner* pTaskInner = NULL;
6,150,379✔
3100
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
6,150,379✔
3101

3102
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
6,150,152✔
3103
    int32_t        pNum = 0;
6,149,951✔
3104
    STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbCalcDataReq.gid, &pList, &pNum));
6,150,178✔
3105
    BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbCalcDataReq.ver, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
6,150,379✔
3106
                 sStreamReaderInfo->triggerCols, false, NULL);
3107
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, pList, pNum, &sStreamReaderInfo->storageApi));
6,149,967✔
3108

3109
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
6,144,164✔
3110
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
6,148,168✔
3111
  } else {
UNCOV
3112
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
UNCOV
3113
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
UNCOV
3114
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
UNCOV
3115
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
3116
  }
3117

3118
  blockDataCleanup(pTaskInner->pResBlockDst);
6,148,787✔
3119
  bool hasNext = true;
6,149,374✔
3120
  while (1) {
767,878✔
3121
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
6,914,639✔
3122
    if (!hasNext) {
6,909,581✔
3123
      break;
6,141,713✔
3124
    }
3125
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
767,868✔
3126

3127
    SSDataBlock* pBlock = NULL;
767,878✔
3128
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
767,878✔
3129
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderInfo->pFilterInfo, NULL));
767,508✔
3130
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
767,323✔
3131
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
767,096✔
UNCOV
3132
      break;
×
3133
    }
3134
  }
3135

3136
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
6,141,084✔
3137
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
6,146,929✔
3138
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
6,144,339✔
3139
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
6,144,371✔
3140
  printDataBlock(pBlockRes, __func__, "tsdb_calc_data", ((SStreamTask*)pTask)->streamId);
6,145,159✔
3141
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
6,141,155✔
3142
  printDataBlock(pBlockRes, __func__, "tsdb_data", ((SStreamTask*)pTask)->streamId);
6,149,549✔
3143

3144
  if (!hasNext) {
6,150,178✔
3145
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
6,150,178✔
3146
  }
3147

3148
end:
6,149,374✔
3149
  STREAM_PRINT_LOG_END_WITHID(code, lino);
6,149,108✔
3150
  SRpcMsg rsp = {
6,150,755✔
3151
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3152
  tmsgSendRsp(&rsp);
6,150,152✔
3153
  blockDataDestroy(pBlockRes);
6,150,178✔
3154
  taosMemoryFree(pList);
6,149,575✔
3155
  return code;
6,149,374✔
3156
}
3157

3158
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
448,930✔
3159
  int32_t code = 0;
448,930✔
3160
  int32_t lino = 0;
448,930✔
3161
  void*   buf = NULL;
448,930✔
3162
  size_t  size = 0;
448,930✔
3163
  int32_t* slotIdList = NULL;
448,930✔
3164
  SArray* sortedCid = NULL;
448,930✔
3165
  SArray* schemas = NULL;
448,930✔
3166
  SSDataBlock*   pBlockRes = NULL;
448,930✔
3167
  
3168
  void* pTask = sStreamReaderInfo->pTask;
448,930✔
3169
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",ver:%"PRId64, TD_VID(pVnode), __func__, 
448,930✔
3170
    req->tsdbDataReq.skey, req->tsdbDataReq.ekey, req->tsdbDataReq.uid, req->tsdbDataReq.ver);
3171
    
3172
  SStreamReaderTaskInner* pTaskInner = NULL;
448,930✔
3173
  int64_t key = req->tsdbDataReq.uid;
448,930✔
3174

3175
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
448,930✔
3176
    // sort cid and build slotIdList
3177
    slotIdList = taosMemoryMalloc(taosArrayGetSize(req->tsdbDataReq.cids) * sizeof(int32_t));
448,930✔
3178
    STREAM_CHECK_NULL_GOTO(slotIdList, terrno);
448,930✔
3179
    sortedCid = taosArrayDup(req->tsdbDataReq.cids, NULL);
448,930✔
3180
    STREAM_CHECK_NULL_GOTO(sortedCid, terrno);
448,930✔
3181
    taosArraySort(sortedCid, sortCid);
448,930✔
3182
    for (int32_t i = 0; i < taosArrayGetSize(req->tsdbDataReq.cids); i++) {
1,755,334✔
3183
      int16_t* cid = taosArrayGet(req->tsdbDataReq.cids, i);
1,306,404✔
3184
      STREAM_CHECK_NULL_GOTO(cid, terrno);
1,306,404✔
3185
      for (int32_t j = 0; j < taosArrayGetSize(sortedCid); j++) {
2,576,280✔
3186
        int16_t* cidSorted = taosArrayGet(sortedCid, j);
2,576,280✔
3187
        STREAM_CHECK_NULL_GOTO(cidSorted, terrno);
2,576,280✔
3188
        if (*cid == *cidSorted) {
2,576,280✔
3189
          slotIdList[j] = i;
1,306,404✔
3190
          break;
1,306,404✔
3191
        }
3192
      }
3193
    }
3194

3195
    STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, req->tsdbDataReq.uid, &schemas, &sStreamReaderInfo->storageApi));
448,930✔
3196
    STREAM_CHECK_RET_GOTO(shrinkScheams(req->tsdbDataReq.cids, schemas));
445,180✔
3197
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlockRes));
445,180✔
3198

3199
    taosArraySort(schemas, sortSSchema);
445,180✔
3200
    BUILD_OPTION(options, req->tsdbDataReq.suid, req->tsdbDataReq.ver, req->tsdbDataReq.order, req->tsdbDataReq.skey,
445,180✔
3201
                    req->tsdbDataReq.ekey, schemas, true, &slotIdList);
3202
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid, .groupId = 0};
445,180✔
3203
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, pBlockRes, &keyInfo, 1, &sStreamReaderInfo->storageApi));
445,180✔
3204
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
445,180✔
3205
    pTaskInner->pResBlockDst = pBlockRes;
445,180✔
3206
    pBlockRes = NULL;
445,180✔
3207
  } else {
UNCOV
3208
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
UNCOV
3209
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
UNCOV
3210
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
UNCOV
3211
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
3212
  }
3213

3214
  blockDataCleanup(pTaskInner->pResBlockDst);
445,180✔
3215
  bool hasNext = true;
445,180✔
3216
  while (1) {
445,180✔
3217
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
890,360✔
3218
    if (!hasNext) {
890,360✔
3219
      break;
445,180✔
3220
    }
3221

3222
    SSDataBlock* pBlock = NULL;
445,180✔
3223
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
445,180✔
3224
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
445,180✔
3225
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
445,180✔
UNCOV
3226
      break;
×
3227
    }
3228
  }
3229
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
445,180✔
3230
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
445,180✔
3231
  printDataBlock(pTaskInner->pResBlockDst, __func__, "tsdb_data", ((SStreamTask*)pTask)->streamId);
445,180✔
3232
  if (!hasNext) {
445,180✔
3233
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
445,180✔
3234
  }
3235

3236
end:
448,930✔
3237
  STREAM_PRINT_LOG_END_WITHID(code, lino);
448,930✔
3238
  SRpcMsg rsp = {
448,930✔
3239
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3240
  tmsgSendRsp(&rsp);
448,930✔
3241
  taosMemFree(slotIdList);
448,930✔
3242
  taosArrayDestroy(sortedCid);
448,930✔
3243
  taosArrayDestroy(schemas);
448,930✔
3244
  blockDataDestroy(pBlockRes);
448,930✔
3245
  return code;
448,930✔
3246
}
3247

3248
static int32_t vnodeProcessStreamWalMetaNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
12,208,740✔
3249
  int32_t      code = 0;
12,208,740✔
3250
  int32_t      lino = 0;
12,208,740✔
3251
  void*        buf = NULL;
12,208,740✔
3252
  size_t       size = 0;
12,208,740✔
3253
  int64_t      lastVer = 0;
12,208,740✔
3254
  SSTriggerWalNewRsp resultRsp = {0};
12,208,740✔
3255

3256
  void* pTask = sStreamReaderInfo->pTask;
12,208,719✔
3257
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaNewReq.lastVer);
12,208,068✔
3258

3259
  if (sStreamReaderInfo->metaBlock == NULL) {
12,208,679✔
3260
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
184,221✔
3261
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
184,221✔
3262
  }
3263
  blockDataEmpty(sStreamReaderInfo->metaBlock);
12,208,946✔
3264
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
12,208,297✔
3265
  resultRsp.ver = req->walMetaNewReq.lastVer;
12,208,297✔
3266
  STREAM_CHECK_RET_GOTO(processWalVerMetaNew(pVnode, &resultRsp, sStreamReaderInfo, req->walMetaNewReq.ctime));
12,208,297✔
3267

3268
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
12,208,194✔
3269
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
12,209,171✔
3270
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
390,229✔
3271
  buf = rpcMallocCont(size);
390,002✔
3272
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
390,002✔
3273
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
390,229✔
3274
  printDataBlock(resultRsp.deleteBlock, __func__, "delete", ((SStreamTask*)pTask)->streamId);
390,229✔
3275
  printDataBlock(resultRsp.tableBlock, __func__, "table", ((SStreamTask*)pTask)->streamId);
390,229✔
3276

3277
end:
12,209,171✔
3278
  if (code == 0 && resultRsp.totalRows == 0) {
12,209,171✔
3279
    code = TSDB_CODE_STREAM_NO_DATA;
11,818,715✔
3280
    size = sizeof(int64_t) * 2;
11,818,715✔
3281
    buf = rpcMallocCont(size);
11,818,715✔
3282
    *(int64_t*)buf = resultRsp.ver;
11,818,261✔
3283
    *(((int64_t*)buf) + 1) = resultRsp.verTime;
11,818,261✔
3284
  }
3285
  SRpcMsg rsp = {
12,208,946✔
3286
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3287
  tmsgSendRsp(&rsp);
12,208,038✔
3288
  if (code == TSDB_CODE_STREAM_NO_DATA){
12,209,249✔
3289
    code = 0;
11,819,020✔
3290
  }
3291
  STREAM_PRINT_LOG_END_WITHID(code, lino);
12,209,249✔
3292
  blockDataDestroy(resultRsp.deleteBlock);
12,209,800✔
3293
  blockDataDestroy(resultRsp.tableBlock);
12,209,625✔
3294

3295
  return code;
12,209,625✔
3296
}
3297
static int32_t vnodeProcessStreamWalMetaDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
3,259,264✔
3298
  int32_t      code = 0;
3,259,264✔
3299
  int32_t      lino = 0;
3,259,264✔
3300
  void*        buf = NULL;
3,259,264✔
3301
  size_t       size = 0;
3,259,264✔
3302
  SSTriggerWalNewRsp resultRsp = {0};
3,259,264✔
3303
  
3304
  void* pTask = sStreamReaderInfo->pTask;
3,259,719✔
3305
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaDataNewReq.lastVer);
3,259,047✔
3306

3307
  if (sStreamReaderInfo->metaBlock == NULL) {
3,259,047✔
3308
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
91,388✔
3309
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
91,388✔
3310
  }
3311

3312
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
3,259,941✔
3313
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerBlock, false, (SSDataBlock**)&resultRsp.dataBlock));
3,259,496✔
3314
  resultRsp.ver = req->walMetaDataNewReq.lastVer;
3,258,329✔
3315
  resultRsp.checkAlter = true;
3,258,002✔
3316
  resultRsp.indexHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
3,258,002✔
3317
  STREAM_CHECK_NULL_GOTO(resultRsp.indexHash, terrno);
3,258,697✔
3318

3319
  STREAM_CHECK_RET_GOTO(processWalVerMetaDataNew(pVnode, sStreamReaderInfo, &resultRsp));
3,258,697✔
3320

3321
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
3,258,156✔
3322
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
115,920✔
3323
  buf = rpcMallocCont(size);
115,920✔
3324
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
115,920✔
3325
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
115,920✔
3326
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
115,920✔
3327
  printDataBlock(resultRsp.deleteBlock, __func__, "delete", ((SStreamTask*)pTask)->streamId);
115,920✔
3328
  printDataBlock(resultRsp.tableBlock, __func__, "table", ((SStreamTask*)pTask)->streamId);
115,920✔
3329
  printIndexHash(resultRsp.indexHash, pTask);
115,920✔
3330

3331
end:
3,257,911✔
3332
  if (resultRsp.totalRows == 0) {
3,258,597✔
3333
    code = TSDB_CODE_STREAM_NO_DATA;
3,142,459✔
3334
    size = sizeof(int64_t) * 2;
3,142,459✔
3335
    buf = rpcMallocCont(size);
3,142,459✔
3336
    *(int64_t*)buf = resultRsp.ver;
3,141,331✔
3337
    *(((int64_t*)buf) + 1) = resultRsp.verTime;
3,141,331✔
3338
  }
3339
  SRpcMsg rsp = {
3,258,141✔
3340
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3341
  tmsgSendRsp(&rsp);
3,259,491✔
3342
  if (code == TSDB_CODE_STREAM_NO_DATA){
3,259,719✔
3343
    code = 0;
3,143,799✔
3344
  }
3345
  blockDataDestroy(resultRsp.dataBlock);
3,259,719✔
3346
  blockDataDestroy(resultRsp.deleteBlock);
3,258,255✔
3347
  blockDataDestroy(resultRsp.tableBlock);
3,258,467✔
3348
  tSimpleHashCleanup(resultRsp.indexHash);
3,258,027✔
3349

3350
  STREAM_PRINT_LOG_END_WITHID(code, lino);
3,256,296✔
3351

3352
  return code;
3,256,915✔
3353
}
3354

3355
static int32_t vnodeProcessStreamWalDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
8,065,113✔
3356
  int32_t      code = 0;
8,065,113✔
3357
  int32_t      lino = 0;
8,065,113✔
3358
  void*        buf = NULL;
8,065,113✔
3359
  size_t       size = 0;
8,065,113✔
3360
  SSTriggerWalNewRsp resultRsp = {0};
8,065,113✔
3361

3362
  void* pTask = sStreamReaderInfo->pTask;
8,065,314✔
3363
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
8,064,860✔
3364

3365
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerBlock, false, (SSDataBlock**)&resultRsp.dataBlock));
8,064,860✔
3366
  resultRsp.indexHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
8,064,564✔
3367
  STREAM_CHECK_NULL_GOTO(resultRsp.indexHash, terrno);
8,063,587✔
3368
  resultRsp.uidHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
8,063,587✔
3369
  STREAM_CHECK_NULL_GOTO(resultRsp.uidHash, terrno);
8,064,860✔
3370

3371
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
8,064,860✔
3372
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
8,065,314✔
3373

3374
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
8,065,314✔
3375

3376
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
222,124✔
3377
  buf = rpcMallocCont(size);
222,124✔
3378
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
222,124✔
3379
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
222,124✔
3380
  printIndexHash(resultRsp.indexHash, pTask);
222,124✔
3381

3382
end:
8,065,314✔
3383
  if (resultRsp.totalRows == 0) {
8,065,314✔
3384
    buf = rpcMallocCont(sizeof(int64_t));
7,843,190✔
3385
    *(int64_t *)buf = resultRsp.ver;
7,842,736✔
3386
    size = sizeof(int64_t);
7,842,736✔
3387
    code = TSDB_CODE_STREAM_NO_DATA;
7,842,736✔
3388
  }
3389
  SRpcMsg rsp = {
8,064,860✔
3390
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3391
  tmsgSendRsp(&rsp);
8,065,087✔
3392
  if (code == TSDB_CODE_STREAM_NO_DATA){
8,065,314✔
3393
    code = 0;
7,843,190✔
3394
  }
3395

3396
  blockDataDestroy(resultRsp.dataBlock);
8,065,314✔
3397
  blockDataDestroy(resultRsp.deleteBlock);
8,065,089✔
3398
  blockDataDestroy(resultRsp.tableBlock);
8,065,089✔
3399
  tSimpleHashCleanup(resultRsp.indexHash);
8,065,089✔
3400
  tSimpleHashCleanup(resultRsp.uidHash);
8,065,314✔
3401
  STREAM_PRINT_LOG_END_WITHID(code, lino);
8,064,339✔
3402

3403
  return code;
8,064,540✔
3404
}
3405

3406
static int32_t vnodeProcessStreamWalCalcDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,033,679✔
3407
  int32_t      code = 0;
1,033,679✔
3408
  int32_t      lino = 0;
1,033,679✔
3409
  void*        buf = NULL;
1,033,679✔
3410
  size_t       size = 0;
1,033,679✔
3411
  SSTriggerWalNewRsp resultRsp = {0};
1,033,679✔
3412
  SSDataBlock* pBlock1 = NULL;
1,033,679✔
3413
  SSDataBlock* pBlock2 = NULL;
1,033,679✔
3414
  
3415
  void* pTask = sStreamReaderInfo->pTask;
1,033,679✔
3416
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
1,033,679✔
3417

3418
  SSDataBlock* dataBlock = sStreamReaderInfo->isVtableStream ? sStreamReaderInfo->calcBlock : sStreamReaderInfo->triggerBlock;
1,033,679✔
3419
  STREAM_CHECK_RET_GOTO(createOneDataBlock(dataBlock, false, (SSDataBlock**)&resultRsp.dataBlock));
1,033,679✔
3420
  resultRsp.isCalc = sStreamReaderInfo->isVtableStream ? true : false;
1,033,679✔
3421
  resultRsp.indexHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,033,679✔
3422
  STREAM_CHECK_NULL_GOTO(resultRsp.indexHash, terrno);
1,033,679✔
3423
  resultRsp.uidHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,033,679✔
3424
  STREAM_CHECK_NULL_GOTO(resultRsp.uidHash, terrno);
1,033,679✔
3425

3426
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
1,033,679✔
3427
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
1,033,679✔
3428

3429
  if (!sStreamReaderInfo->isVtableStream){
235,408✔
3430
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcBlock, false, &pBlock2));
141,436✔
3431
  
3432
    blockDataTransform(pBlock2, resultRsp.dataBlock);
141,436✔
3433
    blockDataDestroy(resultRsp.dataBlock);
141,436✔
3434
    resultRsp.dataBlock = pBlock2;
141,436✔
3435
    pBlock2 = NULL;
141,436✔
3436
  }
3437

3438
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
235,408✔
3439
  buf = rpcMallocCont(size);
235,408✔
3440
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
235,408✔
3441
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
235,408✔
3442
  printIndexHash(resultRsp.indexHash, pTask);
235,408✔
3443

3444
end:
1,033,679✔
3445
  if (resultRsp.totalRows == 0) {
1,033,679✔
3446
    buf = rpcMallocCont(sizeof(int64_t));
798,271✔
3447
    *(int64_t *)buf = resultRsp.ver;
798,271✔
3448
    size = sizeof(int64_t);
798,271✔
3449
    code = TSDB_CODE_STREAM_NO_DATA;
798,271✔
3450
  }
3451
  SRpcMsg rsp = {
1,033,679✔
3452
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3453
  tmsgSendRsp(&rsp);
1,033,679✔
3454
  if (code == TSDB_CODE_STREAM_NO_DATA){
1,033,679✔
3455
    code = 0;
798,271✔
3456
  }
3457

3458
  blockDataDestroy(pBlock1);
1,033,679✔
3459
  blockDataDestroy(pBlock2);
1,033,679✔
3460
  blockDataDestroy(resultRsp.dataBlock);
1,033,679✔
3461
  blockDataDestroy(resultRsp.deleteBlock);
1,033,679✔
3462
  blockDataDestroy(resultRsp.tableBlock);
1,033,679✔
3463
  tSimpleHashCleanup(resultRsp.indexHash);
1,033,679✔
3464
  tSimpleHashCleanup(resultRsp.uidHash);
1,033,679✔
3465
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,033,679✔
3466

3467
  return code;
1,033,679✔
3468
}
3469

3470
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
260,588✔
3471
  int32_t code = 0;
260,588✔
3472
  int32_t lino = 0;
260,588✔
3473
  void*   buf = NULL;
260,588✔
3474
  size_t  size = 0;
260,588✔
3475
  SArray** gInfo = NULL;
260,588✔
3476
  
3477
  void* pTask = sStreamReaderInfo->pTask;
260,588✔
3478
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
260,588✔
3479

3480
  gInfo = taosHashAcquire(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
260,588✔
3481
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
260,588✔
3482
  SStreamGroupInfo pGroupInfo = {0};
260,588✔
3483
  pGroupInfo.gInfo = *gInfo;
260,588✔
3484

3485
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
260,588✔
3486
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
3487
  buf = rpcMallocCont(size);
260,588✔
3488
  STREAM_CHECK_NULL_GOTO(buf, terrno);
260,588✔
3489
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
260,588✔
3490
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
3491
end:
260,588✔
3492
  taosHashRelease(sStreamReaderInfo->groupIdMap, gInfo);
260,588✔
3493
  if (code != 0) {
260,588✔
UNCOV
3494
    rpcFreeCont(buf);
×
UNCOV
3495
    buf = NULL;
×
UNCOV
3496
    size = 0;
×
3497
  }
3498
  STREAM_PRINT_LOG_END_WITHID(code, lino);
260,588✔
3499
  SRpcMsg rsp = {
260,588✔
3500
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3501
  tmsgSendRsp(&rsp);
260,588✔
3502

3503
  return code;
260,588✔
3504
}
3505

3506
static int32_t setVtableInfo(SVnode* pVnode, SArray* infos, SArray* cids, int64_t uid, uint64_t gid, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo) {
180,786✔
3507
  int32_t              code = 0;
180,786✔
3508
  int32_t              lino = 0;
180,786✔
3509
  void* pTask = sStreamReaderInfo->pTask;
180,786✔
3510

3511
  VTableInfo* vTable = taosArrayReserve(infos, 1);
180,786✔
3512
  STREAM_CHECK_NULL_GOTO(vTable, terrno);
180,786✔
3513
  vTable->uid = uid;
180,786✔
3514
  vTable->gId = gid;
180,786✔
3515

3516
  ST_TASK_DLOG("vgId:%d %s put vtable uid:%"PRId64, TD_VID(pVnode), __func__, uid);
180,786✔
3517

3518
  code = sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByVersionUid(metaReader, sStreamReaderInfo->tableList.version, uid);
180,786✔
3519
  if (code != 0) {
180,786✔
UNCOV
3520
    ST_TASK_ELOG("vgId:%d %s get table entry by uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, uid, tstrerror(code));
×
UNCOV
3521
    goto end;
×
3522
  }
3523
  if (atomic_load_8(&sStreamReaderInfo->isVtableOnlyTs) == 1) {
180,786✔
3524
    vTable->cols.nCols = metaReader->me.colRef.nCols;
10,152✔
3525
    vTable->cols.version = metaReader->me.colRef.version;
10,152✔
3526
    vTable->cols.pColRef = taosMemoryCalloc(metaReader->me.colRef.nCols, sizeof(SColRef));
10,152✔
3527
    STREAM_CHECK_NULL_GOTO(vTable->cols.pColRef, terrno);
10,152✔
3528
    for (size_t j = 0; j < metaReader->me.colRef.nCols; j++) {
60,912✔
3529
      memcpy(vTable->cols.pColRef + j, &metaReader->me.colRef.pColRef[j], sizeof(SColRef));
50,760✔
3530
    }
3531
  } else {
3532
    vTable->cols.nCols = taosArrayGetSize(cids);
170,634✔
3533
    vTable->cols.version = metaReader->me.colRef.version;
170,634✔
3534
    vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
170,634✔
3535
    STREAM_CHECK_NULL_GOTO(vTable->cols.pColRef, terrno);
170,634✔
3536
    for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
650,284✔
3537
      for (size_t j = 0; j < metaReader->me.colRef.nCols; j++) {
2,018,405✔
3538
        if (metaReader->me.colRef.pColRef[j].hasRef &&
1,847,291✔
3539
            metaReader->me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
1,363,190✔
3540
          memcpy(vTable->cols.pColRef + i, &metaReader->me.colRef.pColRef[j], sizeof(SColRef));
308,134✔
3541
          break;
308,335✔
3542
        }
3543
      }
3544
    }
3545
  }
3546
  tDecoderClear(&metaReader->coder);
180,786✔
3547

3548
end:
180,786✔
3549
  return code;
180,786✔
3550
}
3551

3552
static int32_t getAllVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SArray* cids, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo){
72,088✔
3553
  int32_t              code = 0;
72,088✔
3554
  int32_t              lino = 0;
72,088✔
3555
  void* pTask = sStreamReaderInfo->pTask;
72,088✔
3556
  SArray*              pTableListArray = NULL;
72,088✔
3557

3558

3559
  pTableListArray = qStreamGetTableArrayList(sStreamReaderInfo);
72,088✔
3560
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
72,088✔
3561

3562
  vTableInfo->infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
72,088✔
3563
  STREAM_CHECK_NULL_GOTO(vTableInfo->infos, terrno);
72,088✔
3564

3565
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
252,874✔
3566
    SStreamTableKeyInfo* pKeyInfo = taosArrayGetP(pTableListArray, i);
180,786✔
3567
    if (pKeyInfo == NULL || pKeyInfo->markedDeleted) {
180,786✔
UNCOV
3568
      continue;
×
3569
    }
3570
    code = setVtableInfo(pVnode, vTableInfo->infos, cids, pKeyInfo->uid, pKeyInfo->groupId, metaReader, sStreamReaderInfo);
180,786✔
3571
    if (code != 0) {
180,786✔
3572
      ST_TASK_WLOG("vgId:%d %s set vtable info uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, pKeyInfo->uid, tstrerror(code));
×
3573
      code = 0;
×
UNCOV
3574
      continue;
×
3575
    }
3576
  }
3577

3578
end:
72,088✔
3579
  taosArrayDestroyP(pTableListArray, taosMemFree);
72,088✔
3580
  return code;
72,088✔
3581
}
3582

3583
static int32_t getSpicificVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SArray* uids, SArray* cids, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo){
×
3584
  int32_t              code = 0;
×
3585
  int32_t              lino = 0;
×
3586
  void* pTask = sStreamReaderInfo->pTask;
×
3587

UNCOV
3588
  vTableInfo->infos = taosArrayInit(taosArrayGetSize(uids), sizeof(VTableInfo));
×
3589
  STREAM_CHECK_NULL_GOTO(vTableInfo->infos, terrno);
×
3590

3591
  for (size_t i = 0; i < taosArrayGetSize(uids); i++) {
×
3592
    int64_t* uid = taosArrayGet(uids, i);
×
3593
    STREAM_CHECK_NULL_GOTO(uid, terrno);
×
3594

UNCOV
3595
    taosRLockLatch(&sStreamReaderInfo->lock);
×
UNCOV
3596
    uint64_t groupId = qStreamGetGroupIdFromOrigin(sStreamReaderInfo, *uid);
×
3597
    taosRUnLockLatch(&sStreamReaderInfo->lock);
×
3598
    if (groupId == -1) {
×
UNCOV
3599
      ST_TASK_WLOG("vgId:%d %s uid:%"PRId64" not found in stream group", TD_VID(pVnode), __func__, *uid);
×
UNCOV
3600
      continue;
×
3601
    }
UNCOV
3602
    code = setVtableInfo(pVnode, vTableInfo->infos, cids, *uid, groupId, metaReader, sStreamReaderInfo);
×
UNCOV
3603
    if (code != 0) {
×
UNCOV
3604
      ST_TASK_WLOG("vgId:%d %s set vtable info uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, *uid, tstrerror(code));
×
UNCOV
3605
      code = 0;
×
UNCOV
3606
      continue;
×
3607
    }
3608
  }
3609
  
UNCOV
3610
end:
×
UNCOV
3611
  return code;
×
3612
}
3613

3614
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
72,088✔
3615
  int32_t              code = 0;
72,088✔
3616
  int32_t              lino = 0;
72,088✔
3617
  void*                buf = NULL;
72,088✔
3618
  size_t               size = 0;
72,088✔
3619
  SStreamMsgVTableInfo vTableInfo = {0};
72,088✔
3620
  SMetaReader          metaReader = {0};
72,088✔
3621

3622
  void* pTask = sStreamReaderInfo->pTask;
72,088✔
3623
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
72,088✔
3624

3625
  SArray* cids = req->virTableInfoReq.cids;
72,088✔
3626
  STREAM_CHECK_NULL_GOTO(cids, terrno);
72,088✔
3627

3628
  if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
72,088✔
3629
    (void)atomic_val_compare_exchange_8(&sStreamReaderInfo->isVtableOnlyTs, 0, 1);
10,152✔
3630
  }
3631
  sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
72,088✔
3632

3633
  if (req->virTableInfoReq.fetchAllTable || req->virTableInfoReq.uids == NULL || taosArrayGetSize(req->virTableInfoReq.uids) == 0) {
72,088✔
3634
    STREAM_CHECK_RET_GOTO(getAllVinfo(pVnode, &vTableInfo, cids, &metaReader, sStreamReaderInfo));
72,088✔
3635
  } else {
UNCOV
3636
    STREAM_CHECK_RET_GOTO(getSpicificVinfo(pVnode, &vTableInfo, req->virTableInfoReq.uids, cids, &metaReader, sStreamReaderInfo));
×
3637
  }
3638
  ST_TASK_DLOG("vgId:%d %s end, size:%"PRIzu, TD_VID(pVnode), __func__, taosArrayGetSize(vTableInfo.infos));
72,088✔
3639
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
72,088✔
3640

3641
end:
72,088✔
3642
  tDestroySStreamMsgVTableInfo(&vTableInfo);
72,088✔
3643
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReader);
72,088✔
3644
  STREAM_PRINT_LOG_END_WITHID(code, lino);
72,088✔
3645
  SRpcMsg rsp = {
72,088✔
3646
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3647
  tmsgSendRsp(&rsp);
72,088✔
3648
  return code;
72,088✔
3649
}
3650

3651
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
128,361✔
3652
  int32_t                   code = 0;
128,361✔
3653
  int32_t                   lino = 0;
128,361✔
3654
  void*                     buf = NULL;
128,361✔
3655
  size_t                    size = 0;
128,361✔
3656
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
128,361✔
3657
  SMetaReader               metaReader = {0};
128,361✔
3658
  void*                     pTask = sStreamReaderInfo->pTask;
128,361✔
3659

3660
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
128,361✔
3661

3662
  SArray* cols = req->origTableInfoReq.cols;
128,361✔
3663
  STREAM_CHECK_NULL_GOTO(cols, terrno);
128,361✔
3664

3665
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
128,361✔
3666

3667
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
128,361✔
3668

3669
  sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
128,361✔
3670
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
477,304✔
3671
    OTableInfo*    oInfo = taosArrayGet(cols, i);
348,943✔
3672
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
348,943✔
3673
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
348,943✔
3674
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
348,943✔
3675
    code = sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName);
348,943✔
3676
    if (code != 0) {
348,742✔
3677
      code = 0;
201✔
3678
      ST_TASK_ELOG("vgId:%d %s get table entry by name:%s failed, msg:%s", TD_VID(pVnode), __func__, oInfo->refTableName, tstrerror(code));
201✔
3679
      continue;
201✔
3680
    }
3681
    vTableInfo->uid = metaReader.me.uid;
348,541✔
3682
    ST_TASK_DLOG("vgId:%d %s get original uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
348,541✔
3683

3684
    SSchemaWrapper* sSchemaWrapper = NULL;
348,742✔
3685
    if (metaReader.me.type == TD_CHILD_TABLE) {
348,742✔
3686
      int64_t suid = metaReader.me.ctbEntry.suid;
346,938✔
3687
      vTableInfo->suid = suid;
346,938✔
3688
      tDecoderClear(&metaReader.coder);
346,938✔
3689
      STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReader, suid));
346,938✔
3690
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
346,938✔
3691
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
1,804✔
3692
      vTableInfo->suid = 0;
1,804✔
3693
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
1,804✔
3694
    } else {
UNCOV
3695
      ST_TASK_ELOG("invalid table type:%d", metaReader.me.type);
×
3696
    }
3697

3698
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
1,186,312✔
3699
      SSchema* s = sSchemaWrapper->pSchema + j;
1,186,111✔
3700
      if (strcmp(s->name, oInfo->refColName) == 0) {
1,186,312✔
3701
        vTableInfo->cid = s->colId;
348,742✔
3702
        break;
348,742✔
3703
      }
3704
    }
3705
    if (vTableInfo->cid == 0) {
348,943✔
UNCOV
3706
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
3707
              oInfo->refTableName);
3708
    }
3709
    tDecoderClear(&metaReader.coder);
348,742✔
3710
  }
3711

3712
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
128,361✔
3713

3714
end:
128,361✔
3715
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
128,361✔
3716
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReader);
128,361✔
3717
  STREAM_PRINT_LOG_END_WITHID(code, lino);
128,361✔
3718
  SRpcMsg rsp = {
128,361✔
3719
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3720
  tmsgSendRsp(&rsp);
128,361✔
3721
  return code;
128,361✔
3722
}
3723

3724
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
822,912✔
3725
  int32_t                   code = 0;
822,912✔
3726
  int32_t                   lino = 0;
822,912✔
3727
  void*                     buf = NULL;
822,912✔
3728
  size_t                    size = 0;
822,912✔
3729
  SSDataBlock* pBlock = NULL;
822,912✔
3730

3731
  SMetaReader               metaReader = {0};
822,912✔
3732
  SMetaReader               metaReaderStable = {0};
822,912✔
3733
  int64_t streamId = req->base.streamId;
822,912✔
3734
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
822,912✔
3735

3736
  SArray* cols = req->virTablePseudoColReq.cids;
822,912✔
3737
  STREAM_CHECK_NULL_GOTO(cols, terrno);
822,912✔
3738

3739
  sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
822,912✔
3740
  STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
822,912✔
3741

3742
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
822,912✔
3743

3744
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
822,912✔
3745
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
822,912✔
3746
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 || *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
2,043✔
3747
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
2,043✔
3748
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
2,043✔
3749
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
2,043✔
3750
    pBlock->info.rows = 1;
2,043✔
3751
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
2,043✔
3752
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
2,043✔
3753
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
2,043✔
3754
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
820,869✔
3755
    int64_t suid = metaReader.me.ctbEntry.suid;
820,869✔
3756
    sStreamReaderInfo->storageApi.metaReaderFn.readerReleaseLock(&metaReader);
820,869✔
3757
    sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
820,869✔
3758

3759
    STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
820,869✔
3760
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
820,869✔
3761
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
2,115,319✔
3762
      col_id_t* id = taosArrayGet(cols, i);
1,294,450✔
3763
      STREAM_CHECK_NULL_GOTO(id, terrno);
1,294,450✔
3764
      if (*id == -1) {
1,294,450✔
3765
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
813,369✔
3766
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
813,369✔
3767
        continue;
813,369✔
3768
      }
3769
      size_t j = 0;
481,081✔
3770
      for (; j < sSchemaWrapper->nCols; j++) {
924,031✔
3771
        SSchema* s = sSchemaWrapper->pSchema + j;
924,031✔
3772
        if (s->colId == *id) {
924,031✔
3773
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
481,081✔
3774
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
481,081✔
3775
          break;
481,081✔
3776
        }
3777
      }
3778
      if (j == sSchemaWrapper->nCols) {
481,081✔
UNCOV
3779
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
UNCOV
3780
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
3781
      }
3782
    }
3783
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
820,869✔
3784
    pBlock->info.rows = 1;
820,869✔
3785
    
3786
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
2,115,319✔
3787
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,294,450✔
3788
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
1,294,450✔
3789

3790
      if (pDst->info.colId == -1) {
1,294,450✔
3791
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
813,369✔
3792
        continue;
813,369✔
3793
      }
3794
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
481,081✔
UNCOV
3795
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
UNCOV
3796
        continue;
×
3797
      }
3798

3799
      STagVal val = {0};
481,081✔
3800
      val.cid = pDst->info.colId;
481,081✔
3801
      const char* p = sStreamReaderInfo->storageApi.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
481,081✔
3802

3803
      char* data = NULL;
481,081✔
3804
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
481,081✔
3805
        data = tTagValToData((const STagVal*)p, false);
481,081✔
3806
      } else {
3807
        data = (char*)p;
×
3808
      }
3809

3810
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
481,081✔
3811
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
3812

3813
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
481,081✔
3814
          (data != NULL)) {
3815
        taosMemoryFree(data);
437,800✔
3816
      }
3817
    }
3818
  } else {
UNCOV
3819
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
UNCOV
3820
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
3821
    goto end;
×
3822
  }
3823
  
3824
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
822,912✔
3825
  printDataBlock(pBlock, __func__, "", streamId);
822,912✔
3826
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
822,912✔
3827

3828
end:
822,912✔
3829
  if(size == 0){
822,912✔
UNCOV
3830
    code = TSDB_CODE_STREAM_NO_DATA;
×
3831
  }
3832
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReaderStable);
822,912✔
3833
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReader);
822,912✔
3834
  STREAM_PRINT_LOG_END(code, lino);
822,912✔
3835
  SRpcMsg rsp = {
822,912✔
3836
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3837
  tmsgSendRsp(&rsp);
822,912✔
3838
  blockDataDestroy(pBlock);
822,912✔
3839
  return code;
822,912✔
3840
}
3841

3842
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *pInfo) {
5,215,988✔
3843
  int32_t            code = 0;
5,215,988✔
3844
  int32_t            lino = 0;
5,215,988✔
3845
  void*              buf = NULL;
5,215,988✔
3846
  size_t             size = 0;
5,215,988✔
3847
  void*              taskAddr = NULL;
5,215,988✔
3848
  SArray*            pResList = NULL;
5,215,988✔
3849
  bool               hasNext = false;
5,215,988✔
3850

3851
  SResFetchReq req = {0};
5,215,988✔
3852
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
5,215,988✔
3853
                              TSDB_CODE_QRY_INVALID_INPUT);
3854
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
5,215,921✔
3855
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
5,216,166✔
3856

3857
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
5,216,166✔
3858
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
5,216,166✔
3859
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
5,216,166✔
3860
  sStreamReaderCalcInfo->rtInfo.execId = req.execId;
5,216,166✔
3861

3862
  void* pTask = sStreamReaderCalcInfo->pTask;
5,216,166✔
3863
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
5,216,166✔
3864
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
3865

3866
  if (req.reset) {
5,216,166✔
3867
    int64_t uid = 0;
3,909,981✔
3868
    if (req.dynTbname) {
3,909,981✔
3869
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
63,414✔
3870
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
63,414✔
3871
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
63,414✔
3872
        if (pValue != NULL && pValue->isTbname) {
63,414✔
3873
          uid = pValue->uid;
63,414✔
3874
          break;
63,414✔
3875
        }
3876
      }
3877
    }
3878
    
3879
    SReadHandle handle = {0};
3,909,981✔
3880
    handle.vnode = pVnode;
3,909,981✔
3881
    handle.pMsgCb = &pVnode->msgCb;
3,909,981✔
3882
    handle.pWorkerCb = pInfo->workerCb;
3,909,981✔
3883
    handle.uid = uid;
3,909,981✔
3884
    handle.cacheSttStatis = true;
3,909,981✔
3885

3886
    initStorageAPI(&handle.api);
3,909,981✔
3887
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
3,909,736✔
3888
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
3,269,636✔
3889
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
2,958,059✔
3890
      if (node != NULL) {
2,958,304✔
3891
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, false));
428,680✔
3892
      } else {
3893
        ST_TASK_DLOG("vgId:%d %s no scan time range node", TD_VID(pVnode), __func__);
2,529,624✔
3894
      }
3895

3896
      node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pExtTimeRange;
2,958,304✔
3897
      if (node != NULL) {
2,958,549✔
3898
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, true));
75,691✔
3899
      } else {
3900
        ST_TASK_DLOG("vgId:%d %s no interp time range node", TD_VID(pVnode), __func__);
2,882,858✔
3901
      }      
3902
    }
3903

3904
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
3,910,226✔
3905
    sStreamReaderCalcInfo->rtInfo.funcInfo.hasPlaceHolder = sStreamReaderCalcInfo->hasPlaceHolder;
3,909,981✔
3906
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
3,909,981✔
3907

3908
    if (sStreamReaderCalcInfo->pTaskInfo == NULL || !qNeedReset(sStreamReaderCalcInfo->pTaskInfo)) {
3,909,981✔
3909
      qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
553,767✔
3910
      STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
553,767✔
3911
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
3912
                                                    req.taskId));
3913
    } else {
3914
      STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, &handle));
3,356,214✔
3915
    }
3916

3917
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
3,909,981✔
3918
  }
3919

3920
  if (req.pOpParam != NULL) {
5,215,921✔
3921
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, (void*)req.pOpParam);
344,506✔
3922
  }
3923

3924
  pResList = taosArrayInit(4, POINTER_BYTES);
5,215,921✔
3925
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
5,216,166✔
3926
  uint64_t ts = 0;
5,216,166✔
3927
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
5,216,166✔
3928

3929
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
12,015,801✔
3930
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
6,800,770✔
3931
    if (pBlock == NULL) continue;
6,800,770✔
3932
    printDataBlock(pBlock, __func__, "fetch", ((SStreamTask*)pTask)->streamId);
6,800,770✔
3933
/*    
3934
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
3935
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
3936
      printDataBlock(pBlock, __func__, "fetch filter");
3937
    }
3938
*/    
3939
  }
3940

3941
end:
5,216,166✔
3942
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
5,216,166✔
3943
  taosArrayDestroy(pResList);
5,216,166✔
3944
  streamReleaseTask(taskAddr);
5,216,166✔
3945

3946
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST){
5,216,166✔
UNCOV
3947
    code = TDB_CODE_SUCCESS;
×
3948
  }
3949
  STREAM_PRINT_LOG_END(code, lino);
5,216,166✔
3950
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
5,215,904✔
3951
  tmsgSendRsp(&rsp);
5,216,166✔
3952
  tDestroySResFetchReq(&req);
5,215,527✔
3953
  if (TDB_CODE_SUCCESS != code) {
5,216,166✔
UNCOV
3954
    ST_TASK_ELOG("vgId:%d %s failed, code:%d - %s", TD_VID(pVnode), __func__,
×
3955
                 code, tstrerror(code));
3956
  }
3957
  return code;
5,216,166✔
3958
}
3959

3960
static int32_t initTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SVnode* pVnode) {
33,688,766✔
3961
  int32_t code = 0;
33,688,766✔
3962
  if (sStreamReaderInfo->tableList.pTableList != NULL) {  
33,688,766✔
3963
    return code;
33,413,833✔
3964
  }
3965
  taosWLockLatch(&sStreamReaderInfo->lock);
275,609✔
3966
  sStreamReaderInfo->pVnode = pVnode;
275,609✔
3967
  initStorageAPI(&sStreamReaderInfo->storageApi);
275,609✔
3968
  if (sStreamReaderInfo->tableList.pTableList == NULL) {
275,609✔
3969
    code = initStreamTableListInfo(&sStreamReaderInfo->tableList);
275,609✔
3970
    if (code == 0) {
275,609✔
3971
      code = generateTablistForStreamReader(pVnode, sStreamReaderInfo);
275,609✔
3972
      if (code != 0) {
275,609✔
UNCOV
3973
        qStreamDestroyTableInfo(&sStreamReaderInfo->tableList);
×
3974
      } else {
3975
        sStreamReaderInfo->tableList.version = pVnode->state.applied;
275,609✔
3976
        stDebug("vgId:%d %s init table list for stream reader, table num:%zu, version:%" PRId64,
275,609✔
3977
                TD_VID(pVnode), __func__, taosArrayGetSize(sStreamReaderInfo->tableList.pTableList), sStreamReaderInfo->tableList.version);
3978
      }
3979
    }
3980
  }
3981
  taosWUnLockLatch(&sStreamReaderInfo->lock);
275,609✔
3982
  return code;
275,609✔
3983
}
3984

3985
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *pInfo) {
39,010,137✔
3986
  int32_t                   code = 0;
39,010,137✔
3987
  int32_t                   lino = 0;
39,010,137✔
3988
  SSTriggerPullRequestUnion req = {0};
39,010,137✔
3989
  void*                     taskAddr = NULL;
39,010,137✔
3990
  bool                      sendRsp = false;
39,005,040✔
3991

3992
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
39,005,040✔
3993
  if (!syncIsReadyForRead(pVnode->sync)) {
39,007,405✔
3994
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
85,050✔
3995
    return 0;
85,050✔
3996
  }
3997

3998
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
38,924,261✔
3999
    return vnodeProcessStreamFetchMsg(pVnode, pMsg, pInfo);
5,216,166✔
4000
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
33,708,449✔
4001
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
33,708,650✔
4002
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
33,708,200✔
4003
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
33,708,650✔
4004
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64 ", applied:%" PRIx64,
33,706,122✔
4005
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId, pVnode->state.applied);
4006
    SStreamTriggerReaderInfo* sStreamReaderInfo = qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
33,707,083✔
4007
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
33,707,327✔
4008
    STREAM_CHECK_RET_GOTO(initTableList(sStreamReaderInfo, pVnode));
33,688,991✔
4009
    sendRsp = true;
33,689,241✔
4010
    switch (req.base.type) {
33,689,241✔
4011
      case STRIGGER_PULL_SET_TABLE:
128,361✔
4012
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo));
128,361✔
4013
        break;
128,361✔
4014
      case STRIGGER_PULL_LAST_TS:
268,608✔
4015
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
268,608✔
4016
        break;
268,608✔
4017
      case STRIGGER_PULL_FIRST_TS:
216,848✔
4018
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
216,848✔
4019
        break;
216,848✔
4020
      case STRIGGER_PULL_TSDB_META:
416,769✔
4021
      case STRIGGER_PULL_TSDB_META_NEXT:
4022
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo));
416,769✔
4023
        break;
416,769✔
4024
      case STRIGGER_PULL_TSDB_TS_DATA:
100,143✔
4025
        if (sStreamReaderInfo->isVtableStream) {
100,143✔
UNCOV
4026
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo));
×
4027
        } else {
4028
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo));
100,143✔
4029
        }
4030
        break;
100,143✔
4031
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
107,995✔
4032
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
4033
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
107,995✔
4034
        break;
54,111✔
4035
      case STRIGGER_PULL_TSDB_CALC_DATA:
6,149,951✔
4036
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
4037
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
6,149,951✔
4038
        break;
6,148,987✔
4039
      case STRIGGER_PULL_TSDB_DATA:
448,930✔
4040
      case STRIGGER_PULL_TSDB_DATA_NEXT:
4041
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
448,930✔
4042
        break;
445,180✔
4043
      case STRIGGER_PULL_GROUP_COL_VALUE:
260,588✔
4044
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo));
260,588✔
4045
        break;
260,588✔
4046
      case STRIGGER_PULL_VTABLE_INFO:
72,088✔
4047
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
72,088✔
4048
        break;
72,088✔
4049
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
822,912✔
4050
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
822,912✔
4051
        break;
822,912✔
4052
      case STRIGGER_PULL_OTABLE_INFO:
128,361✔
4053
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
128,361✔
4054
        break;
128,361✔
4055
      case STRIGGER_PULL_WAL_META_NEW:
12,208,748✔
4056
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
12,208,748✔
4057
        break;
12,209,625✔
4058
      case STRIGGER_PULL_WAL_DATA_NEW:
8,065,314✔
4059
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
8,065,314✔
4060
        break;
8,065,314✔
4061
      case STRIGGER_PULL_WAL_META_DATA_NEW:
3,259,492✔
4062
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
3,259,492✔
4063
        break;
3,258,826✔
4064
      case STRIGGER_PULL_WAL_CALC_DATA_NEW:
1,033,679✔
4065
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
1,033,679✔
4066
        break;
1,033,679✔
4067
      default:
454✔
4068
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
454✔
UNCOV
4069
        sendRsp = false;
×
UNCOV
4070
        STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
4071
    }
4072
  } else {
UNCOV
4073
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
UNCOV
4074
    STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
4075
  }
4076
end:
33,698,716✔
4077

4078
  streamReleaseTask(taskAddr);
33,706,334✔
4079

4080
  tDestroySTriggerPullRequest(&req);
33,708,479✔
4081
  STREAM_PRINT_LOG_END(code, lino);
33,700,674✔
4082
  if (!sendRsp) {
33,707,332✔
4083
    SRpcMsg rsp = {
36,672✔
4084
      .code = code,
4085
      .pCont = pMsg->info.rsp,
18,336✔
4086
      .contLen = pMsg->info.rspLen,
18,336✔
4087
      .info = pMsg->info,
4088
    };
4089
    tmsgSendRsp(&rsp);
18,336✔
4090
  }
4091
  return code;
33,707,983✔
4092
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc