• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5002

24 Mar 2026 01:11AM UTC coverage: 72.671% (+0.4%) from 72.254%
#5002

push

travis-ci

web-flow
fix: possible memory leak in tdb; (#34872)

1 of 20 new or added lines in 2 files covered. (5.0%)

539 existing lines in 124 files now uncovered.

227507 of 313065 relevant lines covered (72.67%)

147556935.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.79
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdbool.h>
17
#include <stdint.h>
18
#include <taos.h>
19
#include <tdef.h>
20
#include "executor.h"
21
#include "nodes.h"
22
#include "osMemPool.h"
23
#include "osMemory.h"
24
#include "scalar.h"
25
#include "stream.h"
26
#include "streamReader.h"
27
#include "taosdef.h"
28
#include "taoserror.h"
29
#include "tarray.h"
30
#include "tcommon.h"
31
#include "tdatablock.h"
32
#include "tdb.h"
33
#include "tdef.h"
34
#include "tencode.h"
35
#include "tglobal.h"
36
#include "thash.h"
37
#include "tlist.h"
38
#include "tlockfree.h"
39
#include "tmsg.h"
40
#include "tsimplehash.h"
41
#include "ttypes.h"
42
#include "vnd.h"
43
#include "vnode.h"
44
#include "vnodeInt.h"
45
#include "executor.h"
46

47
int32_t cacheTag(SVnode* pVnode, SHashObj* metaCache, SExprInfo* pExprInfo, int32_t numOfExpr, SStorageAPI* api, uint64_t uid, col_id_t colId, SRWLatch* lock);
48

49
#define BUILD_OPTION(options, _suid, _ver, _order, startTime, endTime, _schemas, _isSchema, _pSlotList)      \
50
  SStreamOptions                       options = {.suid = _suid,                                                   \
51
                                                  .ver = _ver,                                                     \
52
                                                  .order = _order,                                                 \
53
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
54
                                                  .schemas = _schemas,                                             \
55
                                                  .isSchema = _isSchema,                                           \
56
                                                  .pSlotList = _pSlotList};
57

58
typedef struct WalMetaResult {
59
  uint64_t    id;
60
  int64_t     skey;
61
  int64_t     ekey;
62
} WalMetaResult;
63

64
static int64_t getSuid(SStreamTriggerReaderInfo* sStreamReaderInfo, STableKeyInfo* pList) {
6,828,989✔
65
  int64_t suid = 0;
6,828,989✔
66
  if (!sStreamReaderInfo->isVtableStream) {
6,828,989✔
67
    suid = sStreamReaderInfo->suid;
6,436,522✔
68
    goto end;
6,436,522✔
69
  }
70

71
  if (pList == NULL) {
392,467✔
72
    goto end;
×
73
  }
74

75
  taosRLockLatch(&sStreamReaderInfo->lock);
392,467✔
76
  SStreamTableMapElement* element = taosHashGet(sStreamReaderInfo->vSetTableList.uIdMap, &pList->uid, LONG_BYTES);  
392,467✔
77
  if (element != 0) {
392,467✔
78
    suid = element->table->groupId;
225,663✔
79
    taosRUnLockLatch(&sStreamReaderInfo->lock);
225,663✔
80
    goto end;
225,663✔
81
  }
82
  taosRUnLockLatch(&sStreamReaderInfo->lock);
166,804✔
83

84
end:
6,828,989✔
85
  return suid;
6,828,989✔
86
}
87

88
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
6,756,556✔
89

90
int32_t sortCid(const void *lp, const void *rp) {
1,898,280✔
91
  int16_t* c1 = (int16_t*)lp;
1,898,280✔
92
  int16_t* c2 = (int16_t*)rp;
1,898,280✔
93

94
  if (*c1 < *c2) {
1,898,280✔
95
    return -1;
1,882,184✔
96
  } else if (*c1 > *c2) {
16,096✔
97
    return 1;
16,096✔
98
  }
99

100
  return 0;
×
101
}
102

103
int32_t sortSSchema(const void *lp, const void *rp) {
1,889,112✔
104
  SSchema* c1 = (SSchema*)lp;
1,889,112✔
105
  SSchema* c2 = (SSchema*)rp;
1,889,112✔
106

107
  if (c1->colId < c2->colId) {
1,889,112✔
108
    return -1;
1,873,016✔
109
  } else if (c1->colId > c2->colId) {
16,096✔
110
    return 1;
16,096✔
111
  }
112

113
  return 0;
×
114
}
115

116
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
41,176,674✔
117
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
41,176,674✔
118
  if (pSrc == NULL) {
41,182,188✔
119
    return terrno;
×
120
  }
121

122
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
41,182,188✔
123
  return 0;
41,180,011✔
124
}
125

126
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
9,055,506✔
127
  int32_t code = pTask->storageApi->tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
9,055,506✔
128
  if (code != TSDB_CODE_SUCCESS) {
9,060,707✔
129
    pTask->storageApi->tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
130
  }
131

132
  return code;
9,056,353✔
133
}
134

135
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
1,417,058✔
136
  return pTask->storageApi->tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes);
1,417,058✔
137
}
138

139
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
133,213✔
140
  int32_t code = 0;
133,213✔
141
  int32_t lino = 0;
133,213✔
142
  void*   buf = NULL;
133,213✔
143
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
133,213✔
144
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
133,213✔
145
  buf = rpcMallocCont(len);
133,213✔
146
  STREAM_CHECK_NULL_GOTO(buf, terrno);
132,978✔
147
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
132,978✔
148
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
133,213✔
149
  *data = buf;
133,213✔
150
  *size = len;
133,213✔
151
  buf = NULL;
133,213✔
152
end:
133,213✔
153
  rpcFreeCont(buf);
133,213✔
154
  return code;
133,213✔
155
}
156

157
static bool ignoreMetaChange(int64_t tableListVer, int64_t ver) {
169,417✔
158
  stDebug("%s tableListVer:%" PRId64 " ver:%" PRId64, __func__, tableListVer, ver);
169,417✔
159
  return tableListVer >= ver;
169,417✔
160
}
161

162
static bool needReLoadTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int8_t tableType, int64_t suid, int64_t uid, bool isCalc){
4,844,883✔
163
  if ((tableType == TD_CHILD_TABLE || tableType == TD_VIRTUAL_CHILD_TABLE) &&
4,844,883✔
164
      sStreamReaderInfo->tableType == TD_SUPER_TABLE && 
2,188,407✔
165
      suid == sStreamReaderInfo->suid) {
795,480✔
166
    taosRLockLatch(&sStreamReaderInfo->lock);
10,543✔
167
    uint64_t gid = qStreamGetGroupIdFromOrigin(sStreamReaderInfo, uid);
10,543✔
168
    taosRUnLockLatch(&sStreamReaderInfo->lock);
10,543✔
169
    if (gid == (uint64_t)-1) return true;
10,543✔
170
  }
171
  return false;
4,834,418✔
172
}
173

174
static bool uidInTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id){
12,069,016✔
175
  int32_t  ret = false;
12,069,016✔
176
  if (sStreamReaderInfo->tableType == TD_SUPER_TABLE) {
12,069,016✔
177
    if (suid != sStreamReaderInfo->suid) goto end;
7,106,393✔
178
    if (qStreamGetTableListNum(sStreamReaderInfo) == 0) goto end;
3,483,757✔
179
  } 
180
  *id = qStreamGetGroupIdFromOrigin(sStreamReaderInfo, uid);
8,443,230✔
181
  if (*id == -1) goto end;
8,443,012✔
182
  ret = true;
5,421,936✔
183

184
end:
12,072,500✔
185
  stTrace("%s ret:%d %p %p check suid:%" PRId64 " uid:%" PRId64 " gid:%"PRIu64, __func__, ret, sStreamReaderInfo, sStreamReaderInfo->tableList.gIdMap, suid, uid, *id);
12,072,500✔
186
  return ret;
12,068,782✔
187
}
188

189
static bool uidInTableListOrigin(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id) {
38,192✔
190
  return uidInTableList(sStreamReaderInfo, suid, uid, id);
38,192✔
191
}
192

193
static bool uidInTableListSet(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id, bool isCalc) {
55,103,641✔
194
  bool ret = false;
55,103,641✔
195
  taosRLockLatch(&sStreamReaderInfo->lock);
55,103,641✔
196
  if (sStreamReaderInfo->isVtableStream) {
55,114,321✔
197
    int64_t tmp[2] = {suid, uid};
43,082,569✔
198
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, tmp, sizeof(tmp)) != NULL) {
43,082,357✔
199
      *id = uid;
14,726,673✔
200
      ret = true;
14,726,885✔
201
    }
202
  } else {
203
    ret = uidInTableList(sStreamReaderInfo, suid, uid, id);
12,031,290✔
204
  }
205

206
end:
55,111,865✔
207
  taosRUnLockLatch(&sStreamReaderInfo->lock);
55,111,865✔
208
  return ret;
55,115,173✔
209
}
210

211
static int32_t  qTransformStreamTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, void* pTableListInfo, StreamTableListInfo* tableInfo){
293,520✔
212
  SArray* pList = qStreamGetTableListArray(pTableListInfo);
293,520✔
213
  int32_t totalSize = taosArrayGetSize(pList);
293,520✔
214
  int32_t code = 0;
293,520✔
215
  void* pTask = sStreamReaderInfo->pTask;
293,520✔
216
  for (int32_t i = 0; i < totalSize; ++i) {
788,832✔
217
    STableKeyInfo* info = taosArrayGet(pList, i);
495,312✔
218
    if (info == NULL) {
495,312✔
219
      continue;
×
220
    }
221
    code = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheTrigger, sStreamReaderInfo->pExprInfoTriggerTag, sStreamReaderInfo->numOfExprTriggerTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
495,312✔
222
    if (code != 0){
495,312✔
223
      ST_TASK_WLOG("%s cacheTag trigger failed for uid:%" PRId64",code:%d", __func__, info->uid, code);
×
224
      continue;
×
225
    }
226
    code = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheCalc, sStreamReaderInfo->pExprInfoCalcTag, sStreamReaderInfo->numOfExprCalcTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
495,312✔
227
    if (code != 0){
495,312✔
228
      ST_TASK_WLOG("%s cacheTag calc failed for uid:%" PRId64",code:%d", __func__, info->uid, code);
×
229
      continue;
×
230
    }
231
    code = qStreamSetTableList(tableInfo, info->uid, info->groupId);
495,312✔
232
    if (code != 0){
495,312✔
233
      return code;
×
234
    }
235
  }
236
  return 0;
293,520✔
237
}
238

239
static int32_t generateTablistForStreamReader(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo) {
293,254✔
240
  int32_t                   code = 0;
293,254✔
241
  int32_t                   lino = 0;
293,254✔
242
  SNodeList* groupNew = NULL;   
293,254✔
243
  void* pTableListInfo = NULL;
293,520✔
244

245
  
246
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
293,520✔
247

248
  STREAM_CHECK_RET_GOTO(qStreamCreateTableListForReader(pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType, groupNew,
292,756✔
249
                                         true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &sStreamReaderInfo->storageApi, 
250
                                         &pTableListInfo, sStreamReaderInfo->groupIdMap));
251
  
252
  STREAM_CHECK_RET_GOTO(qTransformStreamTableList(sStreamReaderInfo, pTableListInfo, &sStreamReaderInfo->tableList));
293,520✔
253
  
254
  void* pTask = sStreamReaderInfo->pTask;
293,520✔
255
  ST_TASK_DLOG("vgId:%d %s tablelist size:%" PRIzu, TD_VID(pVnode), __func__, taosArrayGetSize(sStreamReaderInfo->tableList.pTableList));
293,520✔
256
end:
293,078✔
257
  nodesDestroyList(groupNew);
293,520✔
258
  qStreamDestroyTableList(pTableListInfo);
293,520✔
259
  STREAM_PRINT_LOG_END(code, lino);
293,520✔
260
  return code;
293,520✔
261
}
262

263
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
75,475✔
264
  int32_t code = 0;
75,475✔
265
  int32_t lino = 0;
75,475✔
266
  void*   buf = NULL;
75,475✔
267
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
75,475✔
268
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
75,475✔
269
  buf = rpcMallocCont(len);
75,475✔
270
  STREAM_CHECK_NULL_GOTO(buf, terrno);
75,475✔
271
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
75,475✔
272
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
75,475✔
273
  *data = buf;
75,475✔
274
  *size = len;
75,475✔
275
  buf = NULL;
75,475✔
276
end:
75,475✔
277
  rpcFreeCont(buf);
75,475✔
278
  return code;
75,475✔
279
}
280

281
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
523,628✔
282
  int32_t code = 0;
523,628✔
283
  int32_t lino = 0;
523,628✔
284
  void*   buf = NULL;
523,628✔
285
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
523,628✔
286
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
523,397✔
287
  buf = rpcMallocCont(len);
523,397✔
288
  STREAM_CHECK_NULL_GOTO(buf, terrno);
523,396✔
289
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
523,396✔
290
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
523,162✔
291
  *data = buf;
523,162✔
292
  *size = len;
523,162✔
293
  buf = NULL;
522,932✔
294
end:
522,932✔
295
  rpcFreeCont(buf);
522,932✔
296
  return code;
522,682✔
297
}
298

299

300
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
8,120,836✔
301
  int32_t code = 0;
8,120,836✔
302
  int32_t lino = 0;
8,120,836✔
303
  void*   buf = NULL;
8,120,836✔
304
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
8,120,836✔
305
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
2,037,075✔
306
  buf = rpcMallocCont(dataEncodeSize);
2,036,824✔
307
  STREAM_CHECK_NULL_GOTO(buf, terrno);
2,036,835✔
308
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
2,036,835✔
309
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
2,037,075✔
310
  *data = buf;
2,037,075✔
311
  *size = dataEncodeSize;
2,037,075✔
312
  buf = NULL;
2,037,075✔
313
end:
8,124,750✔
314
  rpcFreeCont(buf);
8,124,750✔
315
  return code;
8,121,214✔
316
}
317

318
static int32_t buildArrayRsp(SArray* pBlockList, void** data, size_t* size) {
56,145✔
319
  int32_t code = 0;
56,145✔
320
  int32_t lino = 0;
56,145✔
321

322
  void*   buf = NULL;
56,145✔
323

324
  int32_t blockNum = 0;
56,145✔
325
  size_t  dataEncodeBufSize = 0;
56,145✔
326
  for(size_t i = 0; i < taosArrayGetSize(pBlockList); i++){
117,543✔
327
    SSDataBlock* pBlock = taosArrayGetP(pBlockList, i);
61,398✔
328
    if (pBlock == NULL || pBlock->info.rows == 0) continue;
61,398✔
329
    int32_t blockSize = blockGetEncodeSize(pBlock);
61,398✔
330
    dataEncodeBufSize += blockSize;
61,398✔
331
    blockNum++;
61,398✔
332
  }
333
  buf = rpcMallocCont(INT_BYTES + dataEncodeBufSize);
56,145✔
334
  STREAM_CHECK_NULL_GOTO(buf, terrno);
56,145✔
335

336
  char* dataBuf = (char*)buf;
56,145✔
337
  *((int32_t*)(dataBuf)) = blockNum;
56,145✔
338
  dataBuf += INT_BYTES;
56,145✔
339
  for(size_t i = 0; i < taosArrayGetSize(pBlockList); i++){
117,543✔
340
    SSDataBlock* pBlock = taosArrayGetP(pBlockList, i);
61,398✔
341
    if (pBlock == NULL || pBlock->info.rows == 0) continue;
61,398✔
342
    int32_t actualLen = blockEncode(pBlock, dataBuf, dataEncodeBufSize, taosArrayGetSize(pBlock->pDataBlock));
61,398✔
343
    STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
61,398✔
344
    dataBuf += actualLen;
61,398✔
345
  }
346
  *data = buf;
56,145✔
347
  *size = INT_BYTES + dataEncodeBufSize;
56,145✔
348
  buf = NULL;
56,145✔
349
end:
56,145✔
350
  rpcFreeCont(buf);
56,145✔
351
  return code;
56,145✔
352
}
353

354
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t id, bool isVTable, int64_t uid,
×
355
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
356
  int32_t code = 0;
×
357
  int32_t lino = 0;
×
358
  int32_t index = 0;
×
359
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
×
360
  if (!isVTable) {
×
361
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
×
362
  }
363
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
×
364
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
×
365
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
×
366
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
×
367
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
×
368

369
end:
×
370
  // STREAM_PRINT_LOG_END(code, lino)
371
  return code;
×
372
}
373

374
static int32_t buildWalMetaBlockNew(SSDataBlock* pBlock, int64_t id, int64_t skey, int64_t ekey, int64_t ver) {
9,930,842✔
375
  int32_t code = 0;
9,930,842✔
376
  int32_t lino = 0;
9,930,842✔
377
  int32_t index = 0;
9,930,842✔
378
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
9,930,842✔
379
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
9,930,406✔
380
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
9,928,867✔
381
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
9,930,101✔
382

383
end:
9,930,772✔
384
  return code;
9,930,772✔
385
}
386

387
static int32_t buildTableBlock(SSDataBlock* pBlock, int64_t id, int64_t ver, ETableBlockType type) {
3,632✔
388
  int32_t code = 0;
3,632✔
389
  int32_t lino = 0;
3,632✔
390
  int32_t index = 0;
3,632✔
391
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
3,632✔
392
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
3,632✔
393
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
3,632✔
394

395
end:
3,632✔
396
  return code;
3,632✔
397
}
398

399
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
×
400
  pTSchema->numOfCols = 1;
×
401
  pTSchema->version = ver;
×
402
  pTSchema->columns[0].colId = colId;
×
403
  pTSchema->columns[0].type = type;
×
404
  pTSchema->columns[0].bytes = bytes;
×
405
}
×
406

407
static int32_t scanDeleteDataNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
30,136✔
408
                              int64_t ver) {
409
  int32_t    code = 0;
30,136✔
410
  int32_t    lino = 0;
30,136✔
411
  SDecoder   decoder = {0};
30,136✔
412
  SDeleteRes req = {0};
30,136✔
413
  void* pTask = sStreamReaderInfo->pTask;
30,136✔
414

415
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
30,136✔
416
  tDecoderInit(&decoder, data, len);
30,136✔
417
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
30,136✔
418
  STREAM_CHECK_CONDITION_GOTO((sStreamReaderInfo->tableType == TSDB_SUPER_TABLE && !sStreamReaderInfo->isVtableStream && req.suid != sStreamReaderInfo->suid), TDB_CODE_SUCCESS);
30,136✔
419
  
420
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
46,721✔
421
    uint64_t* uid = taosArrayGet(req.uidList, i);
26,812✔
422
    STREAM_CHECK_NULL_GOTO(uid, terrno);
26,812✔
423
    uint64_t   id = 0;
26,812✔
424
    ST_TASK_DLOG("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, *uid, req.skey, req.ekey);
26,812✔
425
    STREAM_CHECK_CONDITION_GOTO(!uidInTableListSet(sStreamReaderInfo, req.suid, *uid, &id, false), TDB_CODE_SUCCESS);
26,812✔
426
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->deleteBlock, ((SSDataBlock*)rsp->deleteBlock)->info.rows + 1));
19,909✔
427
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->deleteBlock, id, req.skey, req.ekey, ver));
19,909✔
428
    ((SSDataBlock*)rsp->deleteBlock)->info.rows++;
19,909✔
429
    rsp->totalRows++;
19,909✔
430
  }
431

432
end:
30,136✔
433
  taosArrayDestroy(req.uidList);
30,136✔
434
  tDecoderClear(&decoder);
30,136✔
435
  return code;
30,136✔
436
}
437

438
static int32_t createBlockForProcessMeta(SSDataBlock** pBlock) {
2,964✔
439
  int32_t code = 0;
2,964✔
440
  int32_t lino = 0;
2,964✔
441
  SArray* schemas = NULL;
2,964✔
442

443
  schemas = taosArrayInit(8, sizeof(SSchema));
2,964✔
444
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
2,964✔
445

446
  int32_t index = 0;
2,964✔
447
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
2,964✔
448
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
2,964✔
449
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TINYINT, CHAR_BYTES, index++))  // type
2,964✔
450

451
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
2,964✔
452

453
end:
2,964✔
454
  taosArrayDestroy(schemas);
2,964✔
455
  return code;
2,964✔
456
}
457

458
static int32_t addOneRow(void** tmp, int64_t id, int64_t ver, ETableBlockType type) {
3,632✔
459
  int32_t  code = 0;
3,632✔
460
  int32_t  lino = 0;
3,632✔
461
  if (*tmp == NULL) {
3,632✔
462
    STREAM_CHECK_RET_GOTO(createBlockForProcessMeta((SSDataBlock**)tmp));
2,964✔
463
  }
464
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(*tmp, ((SSDataBlock*)(*tmp))->info.rows + 1));
3,632✔
465
  STREAM_CHECK_RET_GOTO(buildTableBlock(*tmp, id, ver, type));
3,632✔
466
  ((SSDataBlock*)(*tmp))->info.rows++;
3,632✔
467
  
468
end:
3,632✔
469
  return code;
3,632✔
470
}
471

472
static int32_t addUidListToBlock(SArray* uidListAdd, void** block, int64_t ver, int32_t* totalRows, ETableBlockType type) {
94,894✔
473
  for (int32_t i = 0; i < taosArrayGetSize(uidListAdd); ++i) {
98,526✔
474
    uint64_t* uid = taosArrayGet(uidListAdd, i);
3,632✔
475
    if (uid == NULL) {
3,632✔
476
      continue;
×
477
    }
478
    int32_t code = addOneRow(block, *uid, ver, type);
3,632✔
479
    if (code != 0) {
3,632✔
480
      return code;
×
481
    }
482
    (*totalRows)++;
3,632✔
483
  }
484
  return 0;
94,894✔
485
}
486

487
static int32_t qStreamGetAddTable(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListAdd, SArray* uidListAdd) {
45,470✔
488
  int32_t      code = 0;
45,470✔
489
  int32_t      lino = 0;
45,470✔
490
  if (uidListAdd == NULL) {
45,470✔
491
    return 0;
38,718✔
492
  }
493
  void* pTask = sStreamReaderInfo->pTask;
6,752✔
494
  
495
  taosRLockLatch(&sStreamReaderInfo->lock);
6,752✔
496
  int32_t totalSize = taosArrayGetSize(tableListAdd);
6,752✔
497
  for (int32_t i = 0; i < totalSize; ++i) {
9,052✔
498
    STableKeyInfo* info = taosArrayGet(tableListAdd, i);
2,300✔
499
    if (info == NULL) {
2,300✔
500
      continue;
×
501
    }
502
    if (taosHashGet(sStreamReaderInfo->tableList.uIdMap, &info->uid, LONG_BYTES) != NULL) {
2,300✔
503
      continue;
×
504
    }
505
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListAdd, &info->uid), terrno);
4,600✔
506
    ST_TASK_WLOG("%s real add table to list for uid:%" PRId64, __func__, info->uid);
2,300✔
507
  }
508

509
end:
6,752✔
510
  taosRUnLockLatch(&sStreamReaderInfo->lock);
6,752✔
511
  return code;
6,752✔
512
}
513

514
static int32_t qStreamGetDelTable(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListDel, SArray* uidListDel) {
53,384✔
515
  int32_t      code = 0;
53,384✔
516
  int32_t      lino = 0;
53,384✔
517
  if (uidListDel == NULL) {
53,384✔
518
    return 0;
×
519
  }
520
  void* pTask = sStreamReaderInfo->pTask;
53,384✔
521
  
522
  taosRLockLatch(&sStreamReaderInfo->lock);
53,384✔
523
  int32_t totalSize = taosArrayGetSize(tableListDel);
53,384✔
524
  for (int32_t i = 0; i < totalSize; ++i) {
54,256✔
525
    int64_t* uid = taosArrayGet(tableListDel, i);
872✔
526
    if (uid == NULL) {
872✔
527
      continue;
×
528
    }
529
    if (taosHashGet(sStreamReaderInfo->tableList.uIdMap, uid, LONG_BYTES) == NULL) {
872✔
530
      continue;
×
531
    }
532
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListDel, uid), terrno);
872✔
533
    ST_TASK_WLOG("%s real del table from list for uid:%" PRId64, __func__, *uid);
872✔
534
  }
535

536
end:
53,384✔
537
  taosRUnLockLatch(&sStreamReaderInfo->lock);
53,384✔
538
  return code;
53,384✔
539
}
540

541
static int32_t scanDropTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
33,028✔
542
                             int64_t ver) {
543
  int32_t  code = 0;
33,028✔
544
  int32_t  lino = 0;
33,028✔
545
  SDecoder decoder = {0};
33,028✔
546
  void* pTask = sStreamReaderInfo->pTask;
33,028✔
547
  SArray* uidList = NULL;
33,028✔
548
  SArray* uidListDel = NULL;
33,028✔
549
  SArray* uidListDelOutTbl = NULL;
33,028✔
550
  SVDropTbBatchReq req = {0};
33,028✔
551
  tDecoderInit(&decoder, data, len);
33,028✔
552
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
33,028✔
553

554
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
66,056✔
555
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
33,028✔
556
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
33,028✔
557
    uint64_t id = 0;
33,028✔
558
    if(!uidInTableListOrigin(sStreamReaderInfo, pDropTbReq->suid, pDropTbReq->uid, &id)) {
33,028✔
559
      continue;
32,156✔
560
    }
561

562
    if (sStreamReaderInfo->deleteOutTbl != 0) {
872✔
563
      if (uidListDelOutTbl == NULL) {
×
564
        uidListDelOutTbl = taosArrayInit(8, sizeof(tb_uid_t));
×
565
        STREAM_CHECK_NULL_GOTO(uidListDelOutTbl, terrno);
×
566
      }
567
      STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListDelOutTbl, &pDropTbReq->uid), terrno);
×
568
    }
569
    if (sStreamReaderInfo->isVtableStream) {
872✔
570
      if (uidList == NULL) {
872✔
571
        uidList = taosArrayInit(8, sizeof(tb_uid_t));
872✔
572
        STREAM_CHECK_NULL_GOTO(uidList, terrno);
872✔
573
      }
574
      STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &pDropTbReq->uid), terrno);
1,744✔
575
    }
576
    
577
    ST_TASK_DLOG("stream reader scan drop uid %" PRId64 ", id %" PRIu64, pDropTbReq->uid, id);
872✔
578
  }
579
  STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListDelOutTbl, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_DROP));
33,028✔
580

581
  if (sStreamReaderInfo->isVtableStream) {
33,028✔
582
    uidListDel = taosArrayInit(8, sizeof(tb_uid_t));
33,028✔
583
    STREAM_CHECK_NULL_GOTO(uidListDel, terrno);
33,028✔
584
    STREAM_CHECK_RET_GOTO(qStreamGetDelTable(sStreamReaderInfo, uidList, uidListDel));
33,028✔
585
    STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListDel, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_RETIRE));
33,028✔
586
  }
587
  
588
end:
33,028✔
589
  taosArrayDestroy(uidList);
33,028✔
590
  taosArrayDestroy(uidListDel);
33,028✔
591
  taosArrayDestroy(uidListDelOutTbl);
33,028✔
592
  tDecoderClear(&decoder);
33,028✔
593
  return code;
33,028✔
594
}
595

596
static int32_t qStreamModifyTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* tableListAdd, SArray* tableListDel) {
46,907✔
597
  int32_t      code = 0;
46,907✔
598
  int32_t      lino = 0;
46,907✔
599
  void* pTask = sStreamReaderInfo->pTask;
46,907✔
600
  
601
  taosWLockLatch(&sStreamReaderInfo->lock);
46,907✔
602
  int32_t totalSize = taosArrayGetSize(tableListDel);
46,907✔
603
  for (int32_t i = 0; i < totalSize; ++i) {
46,907✔
604
    int64_t* uid = taosArrayGet(tableListDel, i);
×
605
    if (uid == NULL) {
×
606
      continue;
×
607
    }
608
    STREAM_CHECK_RET_GOTO(qStreamRemoveTableList(&sStreamReaderInfo->tableList, *uid));
×
609
  }
610

611
  totalSize = taosArrayGetSize(tableListAdd);
46,907✔
612
  for (int32_t i = 0; i < totalSize; ++i) {
76,722✔
613
    STableKeyInfo* info = taosArrayGet(tableListAdd, i);
29,815✔
614
    if (info == NULL) {
29,815✔
615
      continue;
×
616
    }
617
    int ret = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheTrigger, sStreamReaderInfo->pExprInfoTriggerTag, sStreamReaderInfo->numOfExprTriggerTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
29,815✔
618
    if (ret != 0){
29,815✔
619
      ST_TASK_WLOG("%s cacheTag trigger failed for uid:%" PRId64",code:%d", __func__, info->uid, ret);
6,890✔
620
      continue;
6,890✔
621
    }
622
    ret = cacheTag(sStreamReaderInfo->pVnode, sStreamReaderInfo->pTableMetaCacheCalc, sStreamReaderInfo->pExprInfoCalcTag, sStreamReaderInfo->numOfExprCalcTag, &sStreamReaderInfo->storageApi, info->uid, 0, NULL);
22,925✔
623
    if (ret != 0){
22,925✔
624
      ST_TASK_WLOG("%s cacheTag calc failed for uid:%" PRId64",code:%d", __func__, info->uid, ret);
×
625
      continue;
×
626
    }
627
    STREAM_CHECK_RET_GOTO(qStreamRemoveTableList(&sStreamReaderInfo->tableList, info->uid));
22,925✔
628
    STREAM_CHECK_RET_GOTO(qStreamSetTableList(&sStreamReaderInfo->tableList, info->uid, info->groupId));
22,925✔
629
  }
630

631
end:
46,907✔
632
  taosWUnLockLatch(&sStreamReaderInfo->lock);
46,907✔
633
  return code;
46,907✔
634
}
635

636
static int32_t processTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SArray* uidList, SArray** tableList) {
46,907✔
637
  int32_t code = 0;
46,907✔
638
  int32_t lino = 0;
46,907✔
639
  SNodeList* groupNew = NULL;   
46,907✔
640

641
  if (taosArrayGetSize(uidList) == 0) {
46,907✔
642
    return 0;
17,092✔
643
  }
644
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));  
29,815✔
645
  STREAM_CHECK_RET_GOTO(qStreamFilterTableListForReader(sStreamReaderInfo->pVnode, uidList, groupNew, sStreamReaderInfo->pTagCond,
29,815✔
646
                                                    sStreamReaderInfo->pTagIndexCond, &sStreamReaderInfo->storageApi,
647
                                                    sStreamReaderInfo->groupIdMap, sStreamReaderInfo->suid, tableList));
648

649
end:
29,815✔
650
  nodesDestroyList(groupNew);
29,815✔
651
  return code;
29,815✔
652
}
653

654
static int32_t scanCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
25,114✔
655
                             int64_t ver) {
656
  int32_t  code = 0;
25,114✔
657
  int32_t  lino = 0;
25,114✔
658
  SDecoder decoder = {0};
25,114✔
659
  SArray*  uidList = NULL;
25,114✔
660
  SArray*  tableList = NULL;
25,114✔
661
  SArray*  uidListAdd = NULL;
25,114✔
662
  void* pTask = sStreamReaderInfo->pTask;
25,114✔
663

664
  SVCreateTbBatchReq req = {0};
25,114✔
665
  tDecoderInit(&decoder, data, len);
25,114✔
666
  
667
  STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbBatchReq(&decoder, &req));
25,114✔
668

669
  uidList = taosArrayInit(8, sizeof(tb_uid_t));
25,114✔
670
  STREAM_CHECK_NULL_GOTO(uidList, terrno);
25,114✔
671

672
  if (sStreamReaderInfo->isVtableStream) {
25,114✔
673
    uidListAdd = taosArrayInit(8, sizeof(tb_uid_t));
5,120✔
674
    STREAM_CHECK_NULL_GOTO(uidListAdd, terrno);
5,120✔
675
  }
676
  
677
  SVCreateTbReq* pCreateReq = NULL;
25,114✔
678
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
50,228✔
679
    pCreateReq = req.pReqs + iReq;
25,114✔
680
    if (!needReLoadTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
25,114✔
681
      ST_TASK_DLOG("stream reader scan create table jump, %s", pCreateReq->name);
17,092✔
682
      continue;
17,092✔
683
    }
684
    ST_TASK_ILOG("stream reader scan create table %s", pCreateReq->name);
8,022✔
685
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &pCreateReq->uid), terrno);
16,044✔
686
  }
687
  
688
  STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo, uidList, &tableList));
25,114✔
689
  STREAM_CHECK_RET_GOTO(qStreamGetAddTable(sStreamReaderInfo, tableList, uidListAdd));
25,114✔
690
  STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
25,114✔
691

692
  STREAM_CHECK_RET_GOTO(qStreamModifyTableList(sStreamReaderInfo, tableList, uidList));
25,114✔
693
end:
25,114✔
694
  taosArrayDestroy(uidList);
25,114✔
695
  taosArrayDestroy(uidListAdd);
25,114✔
696
  taosArrayDestroy(tableList);
25,114✔
697
  tDeleteSVCreateTbBatchReq(&req);
25,114✔
698
  tDecoderClear(&decoder);
25,114✔
699
  return code;
25,114✔
700
}
701

702
static int32_t processAutoCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SVCreateTbReq* pCreateReq, int64_t ver) {
4,819,093✔
703
  int32_t  code = 0;
4,819,093✔
704
  int32_t  lino = 0;
4,819,093✔
705
  void*    pTask = sStreamReaderInfo->pTask;
4,819,093✔
706
  SArray*  uidList = NULL;
4,819,537✔
707
  SArray*  tableList = NULL;
4,819,537✔
708

709
  ST_TASK_DLOG("%s start, name:%s uid:%"PRId64, __func__, pCreateReq->name, pCreateReq->uid);
4,819,005✔
710
  if (!needReLoadTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false) ||
4,821,748✔
711
      ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
1,979✔
712
    ST_TASK_DLOG("stream reader scan auto create table jump, %s", pCreateReq->name);
4,817,404✔
713
    goto end;
4,817,636✔
714
  }
715
  uidList = taosArrayInit(8, sizeof(tb_uid_t));
1,437✔
716
  STREAM_CHECK_NULL_GOTO(uidList, terrno);
1,437✔
717
  STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, &pCreateReq->uid), terrno);
2,874✔
718
  ST_TASK_DLOG("stream reader scan auto create table %s", pCreateReq->name);
1,437✔
719

720
  STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo, uidList, &tableList));
1,437✔
721
  STREAM_CHECK_RET_GOTO(qStreamModifyTableList(sStreamReaderInfo, tableList, uidList));
1,437✔
722
end:
4,816,376✔
723
  taosArrayDestroy(uidList);
4,819,073✔
724
  taosArrayDestroy(tableList);
4,818,609✔
725
  return code;
4,818,377✔
726
}
727

728
static bool isColIdInList(SNodeList* colList, col_id_t cid){
460✔
729
  int32_t  code = 0;
460✔
730
  int32_t  lino = 0;
460✔
731
  SNode*  nodeItem = NULL;
460✔
732
  FOREACH(nodeItem, colList) {
1,150✔
733
    SNode*           pNode = ((STargetNode*)nodeItem)->pExpr;
1,150✔
734
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
1,150✔
735
      SColumnNode*     valueNode = (SColumnNode*)(pNode);
1,150✔
736
      if (cid == valueNode->colId) {
1,150✔
737
        return true;
460✔
738
      }
739
    }
740
  }
741
end:
×
742
  return false;
×
743
}
744

745
static bool isAlteredTable(int8_t action, ETableType tbType) {
25,520✔
746
  if (action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL && tbType == TSDB_CHILD_TABLE) {
25,520✔
747
    return true;
20,356✔
748
  } else if (action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL && tbType == TSDB_SUPER_TABLE) {
5,164✔
749
    return true;
×
750
  } else if ((action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF || action == TSDB_ALTER_TABLE_REMOVE_COLUMN_REF) && 
5,164✔
751
     (tbType == TSDB_VIRTUAL_CHILD_TABLE || tbType == TSDB_VIRTUAL_NORMAL_TABLE)) {
2,028✔
752
    return true;
5,164✔
753
  }
754
  return false;
×
755
}
756

757
void getAlterColId(void* pVnode, int64_t uid, const char* colName, col_id_t* colId) {
460✔
758
  SSchemaWrapper *pSchema = metaGetTableSchema(((SVnode *)pVnode)->pMeta, uid, -1, 1, NULL, 0);
460✔
759
  if (pSchema == NULL) {
460✔
760
    return;
×
761
  }
762
  for (int32_t i = 0; i < pSchema->nCols; i++) {
1,150✔
763
    if (strncmp(pSchema->pSchema[i].name, colName, TSDB_COL_NAME_LEN) == 0) {
1,150✔
764
      *colId = pSchema->pSchema[i].colId;
460✔
765
      break;
460✔
766
    }
767
  }
768
  tDeleteSchemaWrapper(pSchema);
769
  return;
460✔
770
}
771

772
static bool checkAlterCondition() {
×
773
  return true;
×
774
}
775

776
// Handle TSDB_ALTER_TABLE_ALTER_COLUMN_REF and TSDB_ALTER_TABLE_REMOVE_COLUMN_REF
777
static int32_t scanAlterTableColumnRef(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, 
5,164✔
778
                                       SVAlterTbReq* pReq, uint64_t uid, int64_t ver) {
779
  int32_t code = 0;
5,164✔
780
  int32_t lino = 0;
5,164✔
781
  void* pTask = sStreamReaderInfo->pTask;
5,164✔
782
  SArray* uidListAdd = NULL;
5,164✔
783

784
  uidListAdd = taosArrayInit(8, sizeof(tb_uid_t));
5,164✔
785
  STREAM_CHECK_NULL_GOTO(uidListAdd, terrno);
5,164✔
786

787
  uint64_t id = 0;
5,164✔
788
  STREAM_CHECK_CONDITION_GOTO(!uidInTableListOrigin(sStreamReaderInfo, sStreamReaderInfo->suid, uid, &id), TDB_CODE_SUCCESS);
5,164✔
789

790
  col_id_t colId = 0;
460✔
791
  getAlterColId(sStreamReaderInfo->pVnode, uid, pReq->colName, &colId);
460✔
792
  if (atomic_load_8(&sStreamReaderInfo->isVtableOnlyTs) == 0 && !isColIdInList(sStreamReaderInfo->triggerCols, colId)) {
460✔
793
    ST_TASK_ILOG("stream reader scan alter table %s, colId %d not in trigger cols", pReq->tbName, colId);
×
794
    goto end;
×
795
  }
796

797
  STREAM_CHECK_NULL_GOTO(taosArrayPush(uidListAdd, &uid), terrno);
460✔
798
  STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
460✔
799

800
  ST_TASK_DLOG("stream reader scan alter table column ref %s", pReq->tbName);
460✔
801

802
end:
5,164✔
803
  taosArrayDestroy(uidListAdd);
5,164✔
804
  STREAM_PRINT_LOG_END_WITHID(code, lino);
5,164✔
805
  return code;
5,164✔
806
}
807

808
static int32_t checkAlter(SStreamTriggerReaderInfo* sStreamReaderInfo, char* tbName, int8_t action, uint64_t *uid) {
25,520✔
809
  int32_t  code = 0;
25,520✔
810
  int32_t  lino = 0;
25,520✔
811
  ETableType tbType = 0;
25,520✔
812
  uint64_t suid = 0;
25,520✔
813

814
  STREAM_CHECK_RET_GOTO(metaGetTableTypeSuidByName(sStreamReaderInfo->pVnode, tbName, &tbType, &suid));
25,520✔
815
  STREAM_CHECK_CONDITION_GOTO(!isAlteredTable(action, tbType), TDB_CODE_SUCCESS);
25,520✔
816
  STREAM_CHECK_CONDITION_GOTO(suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
25,520✔
817
  if (action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
20,140✔
818
    *uid = suid;
×
819
    goto end;
×
820
  }
821
  STREAM_CHECK_RET_GOTO(metaGetTableUidByName(sStreamReaderInfo->pVnode, tbName, uid));
20,140✔
822

823
end:
25,520✔
824
  return code;
25,520✔
825
}
826

827
static SArray* getTableListForAlterSuperTable(SStreamTriggerReaderInfo* sStreamReaderInfo, SVAlterTbReq* pReq){
20,356✔
828
  int32_t code = 0;
20,356✔
829
  int32_t lino = 0;
20,356✔
830
  void* pTask = sStreamReaderInfo->pTask;
20,356✔
831
  SArray* uidList = taosArrayInit(8, sizeof(tb_uid_t));
20,356✔
832
  STREAM_CHECK_NULL_GOTO(uidList, terrno);
20,181✔
833
  for (int32_t i = 0; i < taosArrayGetSize(pReq->tables); i++) {
40,537✔
834
    SUpdateTableTagVal *pTable = taosArrayGet(pReq->tables, i);
20,356✔
835
    uint64_t uid = 0;
20,356✔
836
    code = checkAlter(sStreamReaderInfo, pTable->tbName, pReq->action, &uid);
20,356✔
837
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
20,356✔
838
      code = 0;
×
839
      ST_TASK_WLOG("stream reader scan alter table %s not exist, metaGetTableUidByName", pTable->tbName);
×
840
      continue;
×
841
    }
842
    STREAM_CHECK_RET_GOTO(code);
20,356✔
843
    STREAM_CHECK_NULL_GOTO(taosArrayPush(uidList, (const void *)&uid), terrno);
20,356✔
844
  }
845

846
end:
20,356✔
847
  if (code != 0) {
20,356✔
848
    ST_TASK_ELOG("%s failed,code:%d", __func__, code);
×
849
    taosArrayDestroy(uidList);
×
850
    uidList = NULL;
×
851
  }
852
  return uidList;
20,356✔
853
}
854

855
// Handle TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL and TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL
856
static int32_t scanAlterTableTagVal(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, 
20,356✔
857
                                    SArray* uidList, int64_t ver) {
858
  int32_t code = 0;
20,356✔
859
  int32_t lino = 0;
20,356✔
860
  void* pTask = sStreamReaderInfo->pTask;
20,356✔
861
  SArray* uidListAdd = NULL;
20,356✔
862
  SArray* uidListDel = NULL;
20,356✔
863
  SArray* tableList = NULL;
20,356✔
864

865
  if (sStreamReaderInfo->isVtableStream) {
20,356✔
866
    uidListAdd = taosArrayInit(8, sizeof(tb_uid_t));
1,632✔
867
    STREAM_CHECK_NULL_GOTO(uidListAdd, terrno);
1,632✔
868
  }
869

870
  uidListDel = taosArrayInit(8, sizeof(tb_uid_t));
20,356✔
871
  STREAM_CHECK_NULL_GOTO(uidListDel, terrno);
20,356✔
872

873
  STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo, uidList, &tableList));
20,356✔
874
  STREAM_CHECK_RET_GOTO(qStreamGetDelTable(sStreamReaderInfo, uidList, uidListDel));
20,356✔
875

876
  if (rsp->checkAlter && taosArrayGetSize(uidListDel) > 0 && rsp->totalDataRows > 0) {
20,356✔
877
    rsp->needReturn = true;
×
878
    rsp->ver--;
×
879
    ST_TASK_DLOG("%s stream reader scan alter table need return data", __func__);
×
880
    goto end;
×
881
  }
882

883
  STREAM_CHECK_RET_GOTO(qStreamGetAddTable(sStreamReaderInfo, tableList, uidListAdd));
20,356✔
884
  if (sStreamReaderInfo->isVtableStream) {
20,356✔
885
    STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListAdd, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_ADD));
1,632✔
886
    STREAM_CHECK_RET_GOTO(addUidListToBlock(uidListDel, &rsp->tableBlock, ver, &rsp->totalRows, TABLE_BLOCK_RETIRE));
1,632✔
887
  }
888
  STREAM_CHECK_RET_GOTO(qStreamModifyTableList(sStreamReaderInfo, tableList, uidList));
20,356✔
889

890
  ST_TASK_DLOG("%s stream reader scan alter table tag val", __func__);
20,356✔
891

892
end:
20,356✔
893
  taosArrayDestroy(uidListAdd);
20,356✔
894
  taosArrayDestroy(uidListDel);
20,356✔
895
  taosArrayDestroy(tableList);
20,356✔
896
  STREAM_PRINT_LOG_END_WITHID(code, lino);
20,356✔
897
  return code;
20,356✔
898
}
899

900
static int32_t scanAlterTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len, int64_t ver) {
35,264✔
901
  int32_t  code = 0;
35,264✔
902
  int32_t  lino = 0;
35,264✔
903
  SDecoder decoder = {0};
35,264✔
904
  void* pTask = sStreamReaderInfo->pTask;
35,264✔
905
  SArray* uidList = NULL;
35,264✔
906

907
  ST_TASK_DLOG("%s start", __func__);
35,264✔
908

909
  SVAlterTbReq req = {0};
35,264✔
910
  tDecoderInit(&decoder, data, len);
35,264✔
911
  
912
  STREAM_CHECK_RET_GOTO(tDecodeSVAlterTbReq(&decoder, &req));
35,264✔
913

914
  STREAM_CHECK_CONDITION_GOTO(req.action != TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL && req.action != TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL && 
35,264✔
915
    req.action != TSDB_ALTER_TABLE_ALTER_COLUMN_REF && req.action != TSDB_ALTER_TABLE_REMOVE_COLUMN_REF, TDB_CODE_SUCCESS);
916

917
  uint64_t uid = 0;
25,520✔
918
  if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF || req.action == TSDB_ALTER_TABLE_REMOVE_COLUMN_REF) {
25,520✔
919
    code = checkAlter(sStreamReaderInfo, req.tbName, req.action, &uid);
5,164✔
920
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
5,164✔
921
      ST_TASK_WLOG("stream reader scan alter table %s not exist, metaGetTableUidByName", req.tbName);
×
922
      code = 0;
×
923
      goto end;
×
924
    }
925
    STREAM_CHECK_RET_GOTO(scanAlterTableColumnRef(sStreamReaderInfo, rsp, &req, uid, ver));
5,164✔
926
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
20,356✔
927
    uidList = getTableListForAlterSuperTable(sStreamReaderInfo, &req);
20,356✔
928
    STREAM_CHECK_NULL_GOTO(uidList, terrno);
20,356✔
929
    STREAM_CHECK_RET_GOTO(scanAlterTableTagVal(sStreamReaderInfo, rsp, uidList, ver));
20,356✔
930
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
×
931
    code = checkAlter(sStreamReaderInfo, req.tbName, req.action, &uid);
×
932
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
933
      ST_TASK_WLOG("stream reader scan alter table %s not exist, metaGetTableUidByName", req.tbName);
×
934
      code = 0;
×
935
      goto end;
×
936
    }
937
    uidList = taosArrayInit(8, sizeof(uint64_t));
×
938
    STREAM_CHECK_NULL_GOTO(uidList, terrno);
×
939
    STREAM_CHECK_RET_GOTO(vnodeGetCtbIdList(sStreamReaderInfo->pVnode, uid, uidList));
×
940
    STREAM_CHECK_RET_GOTO(scanAlterTableTagVal(sStreamReaderInfo, rsp, uidList, ver));
×
941
  }
942

943
  ST_TASK_DLOG("%s stream reader scan alter table", __func__);
25,520✔
944

945
end:
35,264✔
946
  destroyAlterTbReq(&req);
35,264✔
947

948
  taosArrayDestroy(uidList);
35,264✔
949
  tDecoderClear(&decoder);
35,264✔
950
  STREAM_PRINT_LOG_END_WITHID(code, lino);
35,264✔
951
  return code;
35,264✔
952
}
953

954
// static int32_t scanAlterSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
955
//   int32_t  code = 0;
956
//   int32_t  lino = 0;
957
//   SDecoder decoder = {0};
958
//   SMAlterStbReq reqAlter = {0};
959
//   SVCreateStbReq req = {0};
960
//   tDecoderInit(&decoder, data, len);
961
//   void* pTask = sStreamReaderInfo->pTask;
962
  
963
//   STREAM_CHECK_RET_GOTO(tDecodeSVCreateStbReq(&decoder, &req));
964
//   STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
965
//   if (req.alterOriData != 0) {
966
//     STREAM_CHECK_RET_GOTO(tDeserializeSMAlterStbReq(req.alterOriData, req.alterOriDataLen, &reqAlter));
967
//     STREAM_CHECK_CONDITION_GOTO(reqAlter.alterType != TSDB_ALTER_TABLE_DROP_TAG && reqAlter.alterType != TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TDB_CODE_SUCCESS);
968
//   }
969
  
970
//   STREAM_CHECK_RET_GOTO(processTableList(sStreamReaderInfo));
971

972
//   ST_TASK_DLOG("stream reader scan alter suid %" PRId64, req.suid);
973
// end:
974
//   tFreeSMAltertbReq(&reqAlter);
975
//   tDecoderClear(&decoder);
976
//   return code;
977
// }
978

979
// static int32_t scanDropSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
980
//   int32_t  code = 0;
981
//   int32_t  lino = 0;
982
//   SDecoder decoder = {0};
983
//   void* pTask = sStreamReaderInfo->pTask;
984

985
//   SVDropStbReq req = {0};
986
//   tDecoderInit(&decoder, data, len);
987
//   STREAM_CHECK_RET_GOTO(tDecodeSVDropStbReq(&decoder, &req));
988
//   STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
989

990
//   ST_TASK_DLOG("stream reader scan drop suid %" PRId64, req.suid);
991
// end:
992
//   tDecoderClear(&decoder);
993
//   return code;
994
// }
995

996
static int32_t scanSubmitTbDataForMeta(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* gidHash, int64_t ver) {
39,423,251✔
997
  int32_t code = 0;
39,423,251✔
998
  int32_t lino = 0;
39,423,251✔
999
  WalMetaResult walMeta = {0};
39,423,251✔
1000
  SSubmitTbData submitTbData = {0};
39,426,751✔
1001
  
1002
  if (tStartDecode(pCoder) < 0) {
39,427,456✔
1003
    code = TSDB_CODE_INVALID_MSG;
×
1004
    TSDB_CHECK_CODE(code, lino, end);
×
1005
  }
1006

1007
  uint8_t       version = 0;
39,429,687✔
1008
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
39,430,031✔
1009
    code = TSDB_CODE_INVALID_MSG;
×
1010
    TSDB_CHECK_CODE(code, lino, end);
×
1011
  }
1012
  version = (submitTbData.flags >> 8) & 0xff;
39,430,031✔
1013
  submitTbData.flags = submitTbData.flags & 0xff;
39,430,031✔
1014

1015
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1016
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
39,430,031✔
1017
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
4,020,200✔
1018
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
4,020,200✔
1019
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
4,020,200✔
1020
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq, ver));
4,020,200✔
1021
  }
1022

1023
  // submit data
1024
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
39,384,989✔
1025
    code = TSDB_CODE_INVALID_MSG;
×
1026
    TSDB_CHECK_CODE(code, lino, end);
×
1027
  }
1028
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
39,426,901✔
1029
    code = TSDB_CODE_INVALID_MSG;
×
1030
    TSDB_CHECK_CODE(code, lino, end);
×
1031
  }
1032

1033
  if (!uidInTableListSet(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &walMeta.id, false)){
39,426,901✔
1034
    goto end;
31,322,121✔
1035
  }
1036
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
8,108,332✔
1037
    code = TSDB_CODE_INVALID_MSG;
×
1038
    TSDB_CHECK_CODE(code, lino, end);
×
1039
  }
1040

1041
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
8,108,332✔
1042
    uint64_t nColData = 0;
×
1043
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1044
      code = TSDB_CODE_INVALID_MSG;
×
1045
      TSDB_CHECK_CODE(code, lino, end);
×
1046
    }
1047

1048
    SColData colData = {0};
×
1049
    code = tDecodeColData(version, pCoder, &colData, false);
×
1050
    if (code) {
×
1051
      code = TSDB_CODE_INVALID_MSG;
×
1052
      TSDB_CHECK_CODE(code, lino, end);
×
1053
    }
1054

1055
    if (colData.flag != HAS_VALUE) {
×
1056
      code = TSDB_CODE_INVALID_MSG;
×
1057
      TSDB_CHECK_CODE(code, lino, end);
×
1058
    }
1059
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
1060
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
1061

1062
    for (uint64_t i = 1; i < nColData; i++) {
×
1063
      code = tDecodeColData(version, pCoder, &colData, true);
×
1064
      if (code) {
×
1065
        code = TSDB_CODE_INVALID_MSG;
×
1066
        TSDB_CHECK_CODE(code, lino, end);
×
1067
      }
1068
    }
1069
  } else {
1070
    uint64_t nRow = 0;
8,108,332✔
1071
    if (tDecodeU64v(pCoder, &nRow) < 0) {
8,108,567✔
1072
      code = TSDB_CODE_INVALID_MSG;
×
1073
      TSDB_CHECK_CODE(code, lino, end);
×
1074
    }
1075

1076
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
17,352,474✔
1077
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
9,243,225✔
1078
      pCoder->pos += pRow->len;
9,243,672✔
1079
      if (iRow == 0){
9,243,672✔
1080
#ifndef NO_UNALIGNED_ACCESS
1081
        walMeta.skey = pRow->ts;
8,108,567✔
1082
#else
1083
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
1084
#endif
1085
      }
1086
      if (iRow == nRow - 1) {
9,243,672✔
1087
#ifndef NO_UNALIGNED_ACCESS
1088
        walMeta.ekey = pRow->ts;
8,108,567✔
1089
#else
1090
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
1091
#endif
1092
      }
1093
    }
1094
  }
1095

1096
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
8,108,567✔
1097
  if (data != NULL) {
8,108,332✔
1098
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
956✔
1099
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
956✔
1100
  } else {
1101
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
8,107,376✔
1102
  }
1103

1104
end:
39,423,014✔
1105
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
39,428,795✔
1106
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
39,427,367✔
1107
  tEndDecode(pCoder);
39,427,367✔
1108
  return code;
39,428,185✔
1109
}
1110

1111
static int32_t scanSubmitDataForMeta(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len, int64_t ver) {
39,426,372✔
1112
  int32_t  code = 0;
39,426,372✔
1113
  int32_t  lino = 0;
39,426,372✔
1114
  SDecoder decoder = {0};
39,426,372✔
1115
  SSHashObj* gidHash = NULL;
39,426,811✔
1116
  void* pTask = sStreamReaderInfo->pTask;
39,426,811✔
1117

1118
  tDecoderInit(&decoder, data, len);
39,426,811✔
1119
  if (tStartDecode(&decoder) < 0) {
39,427,133✔
1120
    code = TSDB_CODE_INVALID_MSG;
×
1121
    TSDB_CHECK_CODE(code, lino, end);
×
1122
  }
1123

1124
  uint64_t nSubmitTbData = 0;
39,427,717✔
1125
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
39,425,510✔
1126
    code = TSDB_CODE_INVALID_MSG;
×
1127
    TSDB_CHECK_CODE(code, lino, end);
×
1128
  }
1129

1130
  gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
39,425,510✔
1131
  STREAM_CHECK_NULL_GOTO(gidHash, terrno);
39,421,242✔
1132

1133
  for (uint64_t i = 0; i < nSubmitTbData; i++) {
78,852,165✔
1134
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataForMeta(&decoder, sStreamReaderInfo, gidHash, ver));
39,417,954✔
1135
  }
1136
  tEndDecode(&decoder);
39,434,211✔
1137

1138
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
39,429,285✔
1139
  int32_t iter = 0;
39,427,373✔
1140
  void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
39,429,113✔
1141
  while (px != NULL) {
47,535,079✔
1142
    WalMetaResult* pMeta = (WalMetaResult*)px;
8,107,436✔
1143
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
8,107,436✔
1144
    ((SSDataBlock*)rsp->metaBlock)->info.rows++;
8,107,607✔
1145
    rsp->totalRows++;
8,107,607✔
1146
    ST_TASK_DLOG("stream reader scan submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
8,107,671✔
1147
          ", ver:%"PRId64, pMeta->skey, pMeta->ekey, pMeta->id, ver);
1148
    px = tSimpleHashIterate(gidHash, px, &iter);
8,107,671✔
1149
  }
1150
end:
39,427,643✔
1151
  tDecoderClear(&decoder);
39,427,643✔
1152
  tSimpleHashCleanup( gidHash);
39,428,501✔
1153
  return code;
39,424,283✔
1154
}
1155

1156
static int32_t createBlockForTsdbMeta(SSDataBlock** pBlock, bool isVTable) {
440,048✔
1157
  int32_t code = 0;
440,048✔
1158
  int32_t lino = 0;
440,048✔
1159
  SArray* schemas = taosArrayInit(8, sizeof(SSchema));
440,048✔
1160
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
440,048✔
1161

1162
  int32_t index = 1;
440,048✔
1163
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
440,048✔
1164
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
440,048✔
1165
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
440,048✔
1166
  if (!isVTable) {
440,048✔
1167
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
47,581✔
1168
  }
1169
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
440,048✔
1170

1171
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
440,048✔
1172

1173
end:
440,048✔
1174
  taosArrayDestroy(schemas);
440,048✔
1175
  return code;
440,048✔
1176
}
1177

1178
static int32_t createBlockForWalMetaNew(SSDataBlock** pBlock) {
304,020✔
1179
  int32_t code = 0;
304,020✔
1180
  int32_t lino = 0;
304,020✔
1181
  SArray* schemas = NULL;
304,020✔
1182

1183
  schemas = taosArrayInit(8, sizeof(SSchema));
304,020✔
1184
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
304,252✔
1185

1186
  int32_t index = 0;
304,252✔
1187
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
304,252✔
1188
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
304,020✔
1189
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
304,252✔
1190
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
304,252✔
1191

1192
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
304,252✔
1193

1194
end:
304,252✔
1195
  taosArrayDestroy(schemas);
304,252✔
1196
  return code;
304,252✔
1197
}
1198

1199
static int32_t processMeta(int16_t msgType, SStreamTriggerReaderInfo* sStreamReaderInfo, void *data, int32_t len, SSTriggerWalNewRsp* rsp, int64_t ver) {
541,817✔
1200
  int32_t code = 0;
541,817✔
1201
  int32_t lino = 0;
541,817✔
1202
  void* pTask = sStreamReaderInfo->pTask;
541,817✔
1203

1204
  ST_TASK_DLOG("%s check meta msg, stream ver:%" PRId64 ", wal ver:%" PRId64, __func__, sStreamReaderInfo->tableList.version, ver);
541,817✔
1205

1206
  SDecoder dcoder = {0};
541,817✔
1207
  tDecoderInit(&dcoder, data, len);
541,817✔
1208
  if (msgType == TDMT_VND_DELETE && sStreamReaderInfo->deleteReCalc != 0) {
541,817✔
1209
    if (rsp->deleteBlock == NULL) {
30,136✔
1210
      STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&rsp->deleteBlock));
10,732✔
1211
    }
1212
      
1213
    STREAM_CHECK_RET_GOTO(scanDeleteDataNew(sStreamReaderInfo, rsp, data, len, ver));
30,136✔
1214
  } else if (msgType == TDMT_VND_DROP_TABLE && 
511,681✔
1215
    (sStreamReaderInfo->deleteOutTbl != 0 || sStreamReaderInfo->isVtableStream)) {
50,741✔
1216
    STREAM_CHECK_RET_GOTO(scanDropTableNew(sStreamReaderInfo, rsp, data, len, ver));
33,028✔
1217
  // } else if (msgType == TDMT_VND_DROP_STB) {
1218
  //   STREAM_CHECK_RET_GOTO(scanDropSTableNew(sStreamReaderInfo, data, len));
1219
  } else if (msgType == TDMT_VND_CREATE_TABLE && !ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
478,653✔
1220
    STREAM_CHECK_RET_GOTO(scanCreateTableNew(sStreamReaderInfo, rsp, data, len, ver));
25,114✔
1221
  } else if (msgType == TDMT_VND_ALTER_STB && !ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
453,539✔
1222
    // STREAM_CHECK_RET_GOTO(scanAlterSTableNew(sStreamReaderInfo, data, len));
1223
  } else if (msgType == TDMT_VND_ALTER_TABLE && !ignoreMetaChange(sStreamReaderInfo->tableList.version, ver)) {
426,721✔
1224
    STREAM_CHECK_RET_GOTO(scanAlterTableNew(sStreamReaderInfo, rsp, data, len, ver));
35,264✔
1225
  }
1226

1227
end:
541,817✔
1228
  tDecoderClear(&dcoder);
541,817✔
1229
  return code;
541,817✔
1230
}
1231
static int32_t processWalVerMetaNew(SVnode* pVnode, SSTriggerWalNewRsp* rsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
11,992,537✔
1232
                       int64_t ctime) {
1233
  int32_t code = 0;
11,992,537✔
1234
  int32_t lino = 0;
11,992,537✔
1235
  void* pTask = sStreamReaderInfo->pTask;
11,992,537✔
1236

1237
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
11,992,537✔
1238
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
11,990,842✔
1239
  code = walReaderSeekVer(pWalReader, rsp->ver);
11,990,842✔
1240
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
11,990,381✔
1241
    if (rsp->ver < walGetFirstVer(pWalReader->pWal)) {
6,652,884✔
1242
      rsp->ver = walGetFirstVer(pWalReader->pWal);
×
1243
      rsp->verTime = 0;
×
1244
    } else {
1245
      rsp->verTime = taosGetTimestampUs();
6,652,677✔
1246
    }
1247
    ST_TASK_DLOG("vgId:%d %s scan wal end:%s", TD_VID(pVnode), __func__, tstrerror(code));
6,651,673✔
1248
    code = TSDB_CODE_SUCCESS;
6,653,256✔
1249
    goto end;
6,653,256✔
1250
  }
1251
  STREAM_CHECK_RET_GOTO(code);
5,337,497✔
1252

1253
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, STREAM_RETURN_ROWS_NUM));
5,337,497✔
1254
  while (1) {
39,856,612✔
1255
    code = walNextValidMsg(pWalReader, true);
45,194,064✔
1256
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
45,195,397✔
1257
      rsp->verTime = taosGetTimestampUs();
5,337,917✔
1258
      ST_TASK_DLOG("vgId:%d %s scan wal end:%s", TD_VID(pVnode), __func__, tstrerror(code));
5,337,917✔
1259
      code = TSDB_CODE_SUCCESS;
5,338,121✔
1260
      goto end;
5,338,121✔
1261
    }
1262
    STREAM_CHECK_RET_GOTO(code);
39,856,747✔
1263
    rsp->ver = pWalReader->curVersion;
39,856,747✔
1264
    SWalCont* wCont = &pWalReader->pHead->head;
39,858,046✔
1265
    rsp->verTime = wCont->ingestTs;
39,858,443✔
1266
    if (wCont->ingestTs / 1000 > ctime) break;
39,857,993✔
1267
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
39,857,142✔
1268
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
39,857,360✔
1269
    int64_t ver = wCont->version;
39,857,805✔
1270

1271
    ST_TASK_DLOG("vgId:%d stream reader scan wal ver:%" PRId64 "/%" PRId64 ", type:%s, deleteData:%d, deleteTb:%d",
39,856,938✔
1272
      TD_VID(pVnode), ver, walGetAppliedVer(pWalReader->pWal), TMSG_INFO(wCont->msgType), sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
1273
    if (wCont->msgType == TDMT_VND_SUBMIT) {
39,860,445✔
1274
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
39,428,014✔
1275
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
39,428,014✔
1276
      STREAM_CHECK_RET_GOTO(scanSubmitDataForMeta(sStreamReaderInfo, rsp, data, len, ver));
39,426,780✔
1277
    } else {
1278
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, rsp, ver));
430,581✔
1279
    }
1280

1281
    if (rsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
39,853,409✔
1282
      break;
×
1283
    }
1284
  }
1285

1286
end:
11,991,377✔
1287
  walCloseReader(pWalReader);
11,991,377✔
1288
  return code;
11,990,941✔
1289
}
1290

1291
int32_t cacheTag(SVnode* pVnode, SHashObj* metaCache, SExprInfo* pExprInfo, int32_t numOfExpr, SStorageAPI* api, uint64_t uid, col_id_t colId, SRWLatch* lock) {
41,602,441✔
1292
  int32_t     code = 0;
41,602,441✔
1293
  int32_t     lino = 0;
41,602,441✔
1294
  SMetaReader mr = {0};
41,602,441✔
1295
  SArray* tagCache = NULL;
41,608,060✔
1296
  char* data = NULL;
41,610,442✔
1297

1298
  if (lock != NULL) taosWLockLatch(lock);
41,609,189✔
1299
  STREAM_CHECK_CONDITION_GOTO(numOfExpr == 0, code);
41,618,617✔
1300
  stDebug("%s start,uid:%"PRIu64, __func__, uid);
2,271,497✔
1301
  void* uidData = taosHashGet(metaCache, &uid, LONG_BYTES);
2,272,151✔
1302
  if (uidData == NULL) {
2,272,151✔
1303
    tagCache = taosArrayInit(numOfExpr, POINTER_BYTES);
2,253,358✔
1304
    STREAM_CHECK_NULL_GOTO(tagCache, terrno);
2,253,358✔
1305
    if(taosHashPut(metaCache, &uid, LONG_BYTES, &tagCache, POINTER_BYTES) != 0) {
2,253,358✔
1306
      taosArrayDestroy(tagCache);
×
1307
      code = terrno;
×
1308
      goto end;
×
1309
    }
1310
  } else {
1311
    tagCache = *(SArray**)uidData;
18,793✔
1312
    stDebug("%s found tagCache, size:%zu %d, uid:%"PRIu64, __func__, taosArrayGetSize(tagCache), numOfExpr, uid);
18,793✔
1313
    STREAM_CHECK_CONDITION_GOTO(taosArrayGetSize(tagCache) != numOfExpr, TSDB_CODE_INVALID_PARA);
18,793✔
1314
  }
1315
  
1316
  api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
2,272,151✔
1317
  code = api->metaReaderFn.getEntryGetUidCache(&mr, uid);
2,272,151✔
1318
  api->metaReaderFn.readerReleaseLock(&mr);
2,270,251✔
1319
  STREAM_CHECK_RET_GOTO(code);
2,270,573✔
1320
  
1321
  for (int32_t j = 0; j < numOfExpr; ++j) {
7,867,668✔
1322
    const SExprInfo* pExpr1 = &pExprInfo[j];
5,602,809✔
1323
    int32_t functionId = pExpr1->pExpr->_function.functionId;
5,602,041✔
1324
    col_id_t cid = 0;
5,604,760✔
1325
    // this is to handle the tbname
1326
    if (fmIsScanPseudoColumnFunc(functionId)) {
5,604,760✔
1327
      int32_t fType = pExpr1->pExpr->_function.functionType;
617,113✔
1328
      if (fType == FUNCTION_TYPE_TBNAME) {
617,387✔
1329
        data = taosMemoryCalloc(1, strlen(mr.me.name) + VARSTR_HEADER_SIZE);
617,387✔
1330
        STREAM_CHECK_NULL_GOTO(data, terrno);
617,147✔
1331
        STR_TO_VARSTR(data, mr.me.name)
617,147✔
1332
      }
1333
      cid = -1;
617,148✔
1334
    } else {  // these are tags
1335
      const char* p = NULL;
4,986,218✔
1336
      char* pData = NULL;
4,986,218✔
1337
      int8_t type = pExpr1->base.resSchema.type;
4,986,218✔
1338
      int32_t len = pExpr1->base.resSchema.bytes;
4,986,598✔
1339
      STagVal tagVal = {0};
4,986,598✔
1340
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
4,987,358✔
1341
      cid = tagVal.cid;
4,987,373✔
1342
      if (colId != 0 && cid != colId) {
4,987,373✔
1343
        continue;
1,021✔
1344
      }
1345
      p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, type, &tagVal);
4,986,352✔
1346

1347
      if (type != TSDB_DATA_TYPE_JSON && p != NULL) {
4,984,832✔
1348
        pData = tTagValToData((const STagVal*)p, false);
4,982,207✔
1349
      } else {
1350
        pData = (char*)p;
2,625✔
1351
      }
1352

1353
      if (pData != NULL && (type == TSDB_DATA_TYPE_JSON || !IS_VAR_DATA_TYPE(type))) {
4,983,327✔
1354
        if (type == TSDB_DATA_TYPE_JSON) {
2,489,668✔
1355
          len = getJsonValueLen(pData);
×
1356
        }
1357
        data = taosMemoryCalloc(1, len);
2,489,668✔
1358
        STREAM_CHECK_NULL_GOTO(data, terrno);
2,490,033✔
1359
        (void)memcpy(data, pData, len);
2,490,033✔
1360
      } else {
1361
        data = pData;
2,493,659✔
1362
      }
1363
    }
1364
    if (uidData == NULL){
5,600,201✔
1365
      STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &data), terrno);
11,130,443✔
1366
    } else {
1367
      void* pre = taosArrayGetP(tagCache, j);
36,361✔
1368
      taosMemoryFree(pre);
36,361✔
1369
      taosArraySet(tagCache, j, &data);
36,361✔
1370
    }
1371
    data = NULL;
5,602,204✔
1372
  }
1373

1374
end:
41,617,448✔
1375
  taosMemoryFree(data);
41,603,967✔
1376
  api->metaReaderFn.clearReader(&mr);
41,610,508✔
1377
  if (lock != NULL) taosWUnLockLatch(lock);
41,599,344✔
1378
  return code;
41,602,946✔
1379
}
1380

1381
int32_t fillTag(SHashObj* metaCache, SExprInfo* pExprInfo, int32_t numOfExpr,
108,235,756✔
1382
                uint64_t uid, SSDataBlock* pBlock, uint32_t currentRow, uint32_t numOfRows, uint32_t numOfBlocks, SRWLatch* lock) {
1383
  int32_t     code = 0;
108,235,756✔
1384
  int32_t     lino = 0;
108,235,756✔
1385
  SArray* tagCache = NULL;
108,235,756✔
1386
  if (numOfExpr == 0) {
108,235,756✔
1387
    return TSDB_CODE_SUCCESS;
38,829,134✔
1388
  }
1389

1390
  taosRLockLatch(lock);
69,406,622✔
1391
  void* uidData = taosHashGet(metaCache, &uid, LONG_BYTES);
69,503,327✔
1392
  if (uidData == NULL) {
69,495,449✔
1393
    stError("%s error uidData is null,uid:%"PRIu64, __func__, uid);
×
1394
  } else {
1395
    tagCache = *(SArray**)uidData;
69,495,449✔
1396
    if(taosArrayGetSize(tagCache) != numOfExpr) {
69,495,814✔
1397
      stError("%s numOfExpr:%d,tagCache size:%zu", __func__, numOfExpr, taosArrayGetSize(tagCache));
×
1398
      tagCache = NULL;
×
1399
    }
1400
  }
1401
  
1402
  for (int32_t j = 0; j < numOfExpr; ++j) {
311,964,343✔
1403
    const SExprInfo* pExpr1 = &pExprInfo[j];
242,430,450✔
1404
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
242,469,148✔
1405

1406
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
242,494,592✔
1407
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
242,431,270✔
1408
    int32_t functionId = pExpr1->pExpr->_function.functionId;
242,431,270✔
1409

1410
    // this is to handle the tbname
1411
    if (fmIsScanPseudoColumnFunc(functionId)) {
242,451,812✔
1412
      int32_t fType = pExpr1->pExpr->_function.functionType;
3,623,098✔
1413
      if (fType == FUNCTION_TYPE_TBNAME) {
3,623,098✔
1414
        pColInfoData->info.colId = -1;
3,622,824✔
1415
      }
1416
    } 
1417
    char* data = tagCache == NULL ? NULL : taosArrayGetP(tagCache, j);
242,359,422✔
1418

1419
    bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
242,255,657✔
1420
    if (isNullVal) {
242,262,139✔
1421
      colDataSetNNULL(pColInfoData, currentRow, numOfRows);
×
1422
    } else {
1423
      if (!IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
242,262,139✔
1424
        for (uint32_t i = 0; i < numOfRows; i++){
2,147,483,647✔
1425
          colDataClearNull_f(pColInfoData->nullbitmap, currentRow + i);
2,147,483,647✔
1426
        }
1427
      }
1428
      code = colDataSetNItems(pColInfoData, currentRow, (const char*)data, numOfRows, numOfBlocks, false);
242,345,154✔
1429
      STREAM_CHECK_RET_GOTO(code);
242,473,357✔
1430
    }
1431
  }
1432
end:
69,533,893✔
1433
  taosRUnLockLatch(lock);
69,533,893✔
1434
  return code;
69,500,939✔
1435
}
1436

1437
static int32_t processTag(SStreamTriggerReaderInfo* info, bool isCalc, 
3,516,002✔
1438
  uint64_t uid, SSDataBlock* pBlock, uint32_t currentRow, uint32_t numOfRows, uint32_t numOfBlocks) {
1439
  int32_t     code = 0;
3,516,002✔
1440
  int32_t     lino = 0;
3,516,002✔
1441

1442
  void* pTask = info->pTask;
3,516,002✔
1443
  ST_TASK_DLOG("%s start. rows:%" PRIu32 ",uid:%"PRIu64, __func__,  numOfRows, uid);
3,516,002✔
1444
  
1445
  SHashObj* metaCache = isCalc ? info->pTableMetaCacheCalc : info->pTableMetaCacheTrigger;
3,515,992✔
1446
  SExprInfo*   pExprInfo = isCalc ? info->pExprInfoCalcTag : info->pExprInfoTriggerTag; 
3,516,000✔
1447
  int32_t      numOfExpr = isCalc ? info->numOfExprCalcTag : info->numOfExprTriggerTag;
3,516,232✔
1448
  
1449
  code = fillTag(metaCache, pExprInfo, numOfExpr, uid, pBlock, currentRow, numOfRows, numOfBlocks, &info->lock);
3,516,232✔
1450
  STREAM_CHECK_RET_GOTO(code);
3,515,285✔
1451

1452
end:
3,515,285✔
1453
  return code;
3,515,285✔
1454
}
1455

1456
int32_t getRowRange(SColData* pCol, STimeWindow* window, int32_t* rowStart, int32_t* rowEnd, int32_t* nRows) {
×
1457
  int32_t code = 0;
×
1458
  int32_t lino = 0;
×
1459
  *nRows = 0;
×
1460
  *rowStart = 0;
×
1461
  *rowEnd = pCol->nVal;
×
1462
  if (window != NULL) {
×
1463
    SColVal colVal = {0};
×
1464
    *rowStart = -1;
×
1465
    *rowEnd = -1;
×
1466
    for (int32_t k = 0; k < pCol->nVal; k++) {
×
1467
      STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
1468
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
1469
      if (ts >= window->skey && *rowStart == -1) {
×
1470
        *rowStart = k;
×
1471
      }
1472
      if (ts > window->ekey && *rowEnd == -1) {
×
1473
        *rowEnd = k;
×
1474
      }
1475
    }
1476
    STREAM_CHECK_CONDITION_GOTO(*rowStart == -1 || *rowStart == *rowEnd, TDB_CODE_SUCCESS);
×
1477

1478
    if (*rowStart != -1 && *rowEnd == -1) {
×
1479
      *rowEnd = pCol->nVal;
×
1480
    }
1481
  }
1482
  *nRows = *rowEnd - *rowStart;
×
1483

1484
end:
×
1485
  return code;
×
1486
}
1487

1488
static int32_t setColData(int64_t rows, int32_t rowStart, int32_t rowEnd, SColData* colData, SColumnInfoData* pColData) {
×
1489
  int32_t code = 0;
×
1490
  int32_t lino = 0;
×
1491
  for (int32_t k = rowStart; k < rowEnd; k++) {
×
1492
    SColVal colVal = {0};
×
1493
    STREAM_CHECK_RET_GOTO(tColDataGetValue(colData, k, &colVal));
×
1494
    STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, rows + k - rowStart, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
1495
                                        !COL_VAL_IS_VALUE(&colVal)));
1496
  }
1497
  end:
×
1498
  return code;
×
1499
}
1500

1501
static int32_t getColId(int64_t suid, int64_t uid, int16_t i, SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, int16_t* colId) {
21,353,083✔
1502
  int32_t code = 0;
21,353,083✔
1503
  int32_t lino = 0;
21,353,083✔
1504
  int64_t id[2] = {suid, uid};
21,353,083✔
1505
  taosRLockLatch(&sStreamReaderInfo->lock);
21,353,083✔
1506
  void *px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
21,353,318✔
1507
  STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
21,352,848✔
1508
  SSHashObj* uInfo = *(SSHashObj **)px;
21,352,848✔
1509
  STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
21,352,848✔
1510
  int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
21,352,848✔
1511
  if (tmp != NULL) {
21,352,166✔
1512
    *colId = *tmp;
19,650,706✔
1513
  } else {
1514
    *colId = -1;
1,701,460✔
1515
  }
1516

1517
end:
21,352,401✔
1518
  taosRUnLockLatch(&sStreamReaderInfo->lock);
21,352,166✔
1519
  return code;
21,353,083✔
1520
}
1521

1522
static int32_t getSchemas(SVnode* pVnode, int64_t suid, int64_t uid, int32_t sver, SStreamTriggerReaderInfo* sStreamReaderInfo, STSchema** schema) {
10,215,980✔
1523
  int32_t code = 0;
10,215,980✔
1524
  int32_t lino = 0;
10,215,980✔
1525
  int64_t id = suid != 0 ? suid : uid;
10,215,980✔
1526
  if (sStreamReaderInfo->isVtableStream) {
10,215,980✔
1527
    STSchema** schemaTmp = taosHashGet(sStreamReaderInfo->triggerTableSchemaMapVTable, &id, LONG_BYTES);
6,889,490✔
1528
    if (schemaTmp == NULL || *schemaTmp == NULL || (*schemaTmp)->version != sver) {
6,889,725✔
1529
      *schema = metaGetTbTSchema(pVnode->pMeta, id, sver, 1);
63,748✔
1530
      STREAM_CHECK_NULL_GOTO(*schema, terrno);
63,748✔
1531
      code = taosHashPut(sStreamReaderInfo->triggerTableSchemaMapVTable, &id, LONG_BYTES, schema, POINTER_BYTES);
63,748✔
1532
      if (code != 0) {
63,748✔
1533
        taosMemoryFree(*schema);
×
1534
        goto end;
×
1535
      }
1536
    } else {
1537
      *schema = *schemaTmp;
6,825,977✔
1538
    }
1539
  } else {
1540
    if (sStreamReaderInfo->triggerTableSchema == NULL || sStreamReaderInfo->triggerTableSchema->version != sver) {
3,326,256✔
1541
      taosMemoryFree(sStreamReaderInfo->triggerTableSchema);
82,624✔
1542
      sStreamReaderInfo->triggerTableSchema = metaGetTbTSchema(pVnode->pMeta, id, sver, 1);
82,624✔
1543
      STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerTableSchema, terrno);
82,390✔
1544
    }
1545
    *schema = sStreamReaderInfo->triggerTableSchema;
3,326,024✔
1546
  }
1547
  
1548
end:
10,215,749✔
1549
  return code;
10,215,749✔
1550
}
1551

1552
static int32_t scanSubmitTbData(SVnode* pVnode, SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, 
11,011,920✔
1553
  SSHashObj* ranges, SSHashObj* gidHash, SSTriggerWalNewRsp* rsp, int64_t ver) {
1554
  int32_t code = 0;
11,011,920✔
1555
  int32_t lino = 0;
11,011,920✔
1556
  uint64_t id = 0;
11,011,920✔
1557
  WalMetaResult walMeta = {0};
11,012,157✔
1558
  void* pTask = sStreamReaderInfo->pTask;
11,011,693✔
1559
  SSDataBlock * pBlock = (SSDataBlock*)rsp->dataBlock;
11,011,218✔
1560

1561
  if (tStartDecode(pCoder) < 0) {
11,011,221✔
1562
    ST_TASK_ELOG("vgId:%d %s invalid submit data", TD_VID(pVnode), __func__);
×
1563
    code = TSDB_CODE_INVALID_MSG;
×
1564
    TSDB_CHECK_CODE(code, lino, end);
×
1565
  }
1566

1567
  SSubmitTbData submitTbData = {0};
11,012,165✔
1568
  uint8_t       version = 0;
11,011,216✔
1569
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
11,011,925✔
1570
    ST_TASK_ELOG("vgId:%d %s invalid submit data flags", TD_VID(pVnode), __func__);
×
1571
    code = TSDB_CODE_INVALID_MSG;
×
1572
    TSDB_CHECK_CODE(code, lino, end);
×
1573
  }
1574
  version = (submitTbData.flags >> 8) & 0xff;
11,011,925✔
1575
  submitTbData.flags = submitTbData.flags & 0xff;
11,011,925✔
1576
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1577
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
11,011,925✔
1578
    if (tStartDecode(pCoder) < 0) {
27,073✔
1579
      ST_TASK_ELOG("vgId:%d %s invalid auto create table data", TD_VID(pVnode), __func__);
×
1580
      code = TSDB_CODE_INVALID_MSG;
×
1581
      TSDB_CHECK_CODE(code, lino, end);
×
1582
    }
1583
    tEndDecode(pCoder);
27,073✔
1584
  }
1585

1586
  // submit data
1587
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
11,010,763✔
1588
    ST_TASK_ELOG("vgId:%d %s invalid submit data suid", TD_VID(pVnode), __func__);
×
1589
    code = TSDB_CODE_INVALID_MSG;
×
1590
    TSDB_CHECK_CODE(code, lino, end);
×
1591
  }
1592
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
11,010,064✔
1593
    ST_TASK_ELOG("vgId:%d %s invalid submit data uid", TD_VID(pVnode), __func__);
×
1594
    code = TSDB_CODE_INVALID_MSG;
×
1595
    TSDB_CHECK_CODE(code, lino, end);
×
1596
  }
1597

1598
  ST_TASK_DLOG("%s uid:%" PRId64 ", suid:%" PRId64 ", ver:%" PRId64, __func__, submitTbData.uid, submitTbData.suid, ver);
11,010,064✔
1599

1600
  if (rsp->uidHash != NULL) {
11,010,767✔
1601
    uint64_t* gid = tSimpleHashGet(rsp->uidHash, &submitTbData.uid, LONG_BYTES);
8,412,020✔
1602
    STREAM_CHECK_CONDITION_GOTO(gid == NULL, TDB_CODE_SUCCESS);
8,412,486✔
1603
    ST_TASK_DLOG("%s get uid gid from uidHash, uid:%" PRId64 ", suid:%" PRId64 " gid:%"PRIu64, __func__, submitTbData.uid, submitTbData.suid, *gid);
8,412,486✔
1604
    id = *gid;
8,411,722✔
1605
  } else {
1606
    STREAM_CHECK_CONDITION_GOTO(!uidInTableListSet(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &id, rsp->isCalc), TDB_CODE_SUCCESS);
2,598,991✔
1607
  }
1608

1609
  walMeta.id = id;
10,215,982✔
1610
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
10,215,982✔
1611

1612
  if (ranges != NULL){
10,216,214✔
1613
    void* timerange = tSimpleHashGet(ranges, &id, sizeof(id));
8,412,486✔
1614
    if (timerange == NULL) goto end;;
8,412,489✔
1615
    int64_t* pRange = (int64_t*)timerange;
8,412,489✔
1616
    window.skey = pRange[0];
8,412,489✔
1617
    window.ekey = pRange[1];
8,412,489✔
1618
    ST_TASK_DLOG("%s get time range from ranges, uid:%" PRId64 ", suid:%" PRId64 ", gid:%" PRIu64 ", skey:%" PRId64 ", ekey:%" PRId64,
8,412,489✔
1619
      __func__, submitTbData.uid, submitTbData.suid, id, window.skey, window.ekey);
1620
  }
1621
  
1622
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
10,215,515✔
1623
    ST_TASK_ELOG("vgId:%d %s invalid submit data sver", TD_VID(pVnode), __func__);
×
1624
    code = TSDB_CODE_INVALID_MSG;
×
1625
    TSDB_CHECK_CODE(code, lino, end);
×
1626
  }
1627

1628
  STSchema*    schema = NULL;
10,215,515✔
1629
  STREAM_CHECK_RET_GOTO(getSchemas(pVnode, submitTbData.suid, submitTbData.uid, submitTbData.sver, sStreamReaderInfo, &schema));
10,215,513✔
1630

1631
  SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(rsp->indexHash, &submitTbData.uid, LONG_BYTES);
10,215,749✔
1632
  int32_t blockStart = 0;
10,216,214✔
1633
  int32_t numOfRows = 0;
10,216,214✔
1634
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
10,215,536✔
1635
    uint64_t nColData = 0;
×
1636
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1637
      ST_TASK_ELOG("vgId:%d %s invalid submit data nColData", TD_VID(pVnode), __func__);
×
1638
      code = TSDB_CODE_INVALID_MSG;
×
1639
      TSDB_CHECK_CODE(code, lino, end);
×
1640
    }
1641

1642
    SColData colData = {0};
×
1643
    code = tDecodeColData(version, pCoder, &colData, false);
×
1644
    if (code) {
×
1645
      ST_TASK_ELOG("vgId:%d %s invalid submit data colData", TD_VID(pVnode), __func__);
×
1646
      code = TSDB_CODE_INVALID_MSG;
×
1647
      TSDB_CHECK_CODE(code, lino, end);
×
1648
    }
1649

1650
    if (colData.flag != HAS_VALUE) {
×
1651
      ST_TASK_ELOG("vgId:%d %s invalid submit data colData flag", TD_VID(pVnode), __func__);
×
1652
      code = TSDB_CODE_INVALID_MSG;
×
1653
      TSDB_CHECK_CODE(code, lino, end);
×
1654
    }
1655
    
1656
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
1657
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
1658

1659
    int32_t rowStart = 0;
×
1660
    int32_t rowEnd = 0;
×
1661
    STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, &numOfRows));
×
1662
    STREAM_CHECK_CONDITION_GOTO(numOfRows <= 0, TDB_CODE_SUCCESS);
×
1663

1664
    STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
×
1665
    blockStart = pSlice->currentRowIdx;
×
1666
    int32_t pos = pCoder->pos;
×
1667
    for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
1668
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
1669
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
1670
      if (pColData->info.colId <= -1) {
×
1671
        pColData->hasNull = true;
×
1672
        continue;
×
1673
      }
1674
      if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
1675
        STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colData, pColData));
×
1676
        continue;
×
1677
      }
1678

1679
      pCoder->pos = pos;
×
1680

1681
      int16_t colId = 0;
×
1682
      if (sStreamReaderInfo->isVtableStream){
×
1683
        STREAM_CHECK_RET_GOTO(getColId(submitTbData.suid, submitTbData.uid, i, sStreamReaderInfo, rsp, &colId));
×
1684
        ST_TASK_TLOG("%s vtable colId:%d, i:%d, uid:%" PRId64, __func__, colId, i, submitTbData.uid);
×
1685
      } else {
1686
        colId = pColData->info.colId;
×
1687
      }
1688
      
1689
      uint64_t j = 1;
×
1690
      for (; j < nColData; j++) {
×
1691
        int16_t cid = 0;
×
1692
        int32_t posTmp = pCoder->pos;
×
1693
        pCoder->pos += INT_BYTES;
×
1694
        if ((code = tDecodeI16v(pCoder, &cid))) return code;
×
1695
        pCoder->pos = posTmp;
×
1696
        if (cid == colId) {
×
1697
          SColData colDataTmp = {0};
×
1698
          code = tDecodeColData(version, pCoder, &colDataTmp, false);
×
1699
          if (code) {
×
1700
            code = TSDB_CODE_INVALID_MSG;
×
1701
            TSDB_CHECK_CODE(code, lino, end);
×
1702
          }
1703
          STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colDataTmp, pColData));
×
1704
          break;
×
1705
        }
1706
        code = tDecodeColData(version, pCoder, &colData, true);
×
1707
        if (code) {
×
1708
          code = TSDB_CODE_INVALID_MSG;
×
1709
          TSDB_CHECK_CODE(code, lino, end);
×
1710
        }
1711
      }
1712
      if (j == nColData) {
×
1713
        colDataSetNNULL(pColData, blockStart, numOfRows);
×
1714
      }
1715
    }
1716
  } else {
1717
    uint64_t nRow = 0;
10,215,536✔
1718
    if (tDecodeU64v(pCoder, &nRow) < 0) {
10,215,514✔
1719
      code = TSDB_CODE_INVALID_MSG;
×
1720
      TSDB_CHECK_CODE(code, lino, end);
×
1721
    }
1722
    for (uint64_t iRow = 0; iRow < nRow; ++iRow) {
21,691,535✔
1723
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
11,475,293✔
1724
      pCoder->pos += pRow->len;
11,475,524✔
1725

1726
      if (iRow == 0){
11,475,056✔
1727
#ifndef NO_UNALIGNED_ACCESS
1728
        walMeta.skey = pRow->ts;
10,214,815✔
1729
#else
1730
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
1731
#endif
1732
      }
1733
      if (iRow == nRow - 1) {
11,475,524✔
1734
#ifndef NO_UNALIGNED_ACCESS
1735
        walMeta.ekey = pRow->ts;
10,215,283✔
1736
#else
1737
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
1738
#endif
1739
      }
1740

1741
      if (pRow->ts < window.skey || pRow->ts > window.ekey) {
11,475,055✔
1742
        continue;
7,796✔
1743
      }
1744
      STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
11,467,957✔
1745
      blockStart = pSlice->currentRowIdx;
11,467,957✔
1746
     
1747
      for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
71,114,042✔
1748
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
59,647,273✔
1749
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
59,647,928✔
1750
        if (pColData->info.colId <= -1) {
59,647,928✔
1751
          pColData->hasNull = true;
20,380,782✔
1752
          continue;
20,381,246✔
1753
        }
1754
        int16_t colId = 0;
39,275,147✔
1755
        if (sStreamReaderInfo->isVtableStream){
39,275,149✔
1756
          STREAM_CHECK_RET_GOTO(getColId(submitTbData.suid, submitTbData.uid, i, sStreamReaderInfo, rsp, &colId));
21,351,673✔
1757
          ST_TASK_TLOG("%s vtable colId:%d, i:%d, uid:%" PRId64, __func__, colId, i, submitTbData.uid);
21,352,848✔
1758
        } else {
1759
          colId = pColData->info.colId;
17,924,415✔
1760
        }
1761
        
1762
        SColVal colVal = {0};
39,276,565✔
1763
        int32_t sourceIdx = 0;
39,276,567✔
1764
        while (1) {
1765
          if (sourceIdx >= schema->numOfCols) {
106,979,567✔
1766
            break;
8,800,957✔
1767
          }
1768
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, schema, sourceIdx, &colVal));
98,153,694✔
1769
          if (colVal.cid == colId) {
98,153,827✔
1770
            break;
30,450,827✔
1771
          }
1772
          sourceIdx++;
67,703,000✔
1773
        }
1774
        if (colVal.cid == colId && COL_VAL_IS_VALUE(&colVal)) {
39,251,784✔
1775
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
28,084,209✔
1776
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, blockStart+ numOfRows, (const char*)colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
24,997✔
1777
            ST_TASK_TLOG("%s vtable colId:%d, i:%d, colData:%p, data:%s, len:%d, rowIndex:%d, offset:%d, uid:%" PRId64, __func__, colId, i, pColData, 
26,170✔
1778
              (const char*)colVal.value.pData, colVal.value.nData, blockStart+ numOfRows, pColData->varmeta.offset[blockStart+ numOfRows], submitTbData.uid);
1779
          } else {
1780
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, blockStart + numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
28,059,212✔
1781
          }
1782
        } else {
1783
          colDataSetNULL(pColData, blockStart + numOfRows);
11,167,575✔
1784
        }
1785
      }
1786
      
1787
      numOfRows++;
11,468,225✔
1788
    }
1789
  }
1790

1791
  if (numOfRows > 0) {
10,215,080✔
1792
    if (!sStreamReaderInfo->isVtableStream) {
10,215,080✔
1793
      STREAM_CHECK_RET_GOTO(processTag(sStreamReaderInfo, rsp->isCalc, submitTbData.uid, pBlock, blockStart, numOfRows, 1));
3,325,824✔
1794
    }
1795
    
1796
    SColumnInfoData* pColData = taosArrayGetLast(pBlock->pDataBlock);
10,214,566✔
1797
    STREAM_CHECK_NULL_GOTO(pColData, terrno);
10,214,558✔
1798
    STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, blockStart, (const char*)&ver, numOfRows, 1, false));
10,214,558✔
1799

1800
    STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
10,215,744✔
1801
    ST_TASK_DLOG("%s process submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
10,215,744✔
1802
      ", uid:%" PRId64 ", ver:%"PRId64 ", row index:%d, rows:%d", __func__, window.skey, window.ekey, 
1803
      id, submitTbData.uid, ver, pSlice->currentRowIdx, numOfRows);
1804
    pSlice->currentRowIdx += numOfRows;
10,215,744✔
1805
    pBlock->info.rows += numOfRows;
10,215,744✔
1806
  } else {
1807
    ST_TASK_DLOG("%s no valid data in time range:skey %" PRId64 ", ekey %" PRId64 ", uid:%" PRId64 ", suid:%" PRId64,
×
1808
      __func__, window.skey, window.ekey, submitTbData.uid, submitTbData.suid);
1809
  }
1810
  
1811
  if (gidHash == NULL) goto end;
10,215,515✔
1812

1813
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
1,803,264✔
1814
  if (data != NULL) {
1,803,496✔
1815
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
×
1816
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
×
1817
  } else {
1818
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
1,803,496✔
1819
  }
1820

1821
end:
11,009,058✔
1822
  if (code != 0) {                                                             \
11,012,156✔
1823
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); \
×
1824
  }
1825
  tEndDecode(pCoder);
11,012,156✔
1826
  return code;
11,012,159✔
1827
}
1828
static int32_t scanSubmitData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo,
11,011,953✔
1829
  void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp, int64_t ver) {
1830
  int32_t  code = 0;
11,011,953✔
1831
  int32_t  lino = 0;
11,011,953✔
1832
  SDecoder decoder = {0};
11,011,953✔
1833
  SSHashObj* gidHash = NULL;
11,011,951✔
1834
  void* pTask = sStreamReaderInfo->pTask;
11,011,951✔
1835

1836
  tDecoderInit(&decoder, data, len);
11,012,185✔
1837
  if (tStartDecode(&decoder) < 0) {
11,008,893✔
1838
    code = TSDB_CODE_INVALID_MSG;
×
1839
    TSDB_CHECK_CODE(code, lino, end);
×
1840
  }
1841

1842
  uint64_t nSubmitTbData = 0;
11,010,997✔
1843
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
11,010,058✔
1844
    code = TSDB_CODE_INVALID_MSG;
×
1845
    TSDB_CHECK_CODE(code, lino, end);
×
1846
  }
1847

1848
  if (rsp->metaBlock != NULL){
11,010,058✔
1849
    gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
2,597,348✔
1850
    STREAM_CHECK_NULL_GOTO(gidHash, terrno);
2,595,607✔
1851
  }
1852

1853
  for (uint64_t i = 0; i < nSubmitTbData; i++) {
22,020,486✔
1854
    STREAM_CHECK_RET_GOTO(scanSubmitTbData(pVnode, &decoder, sStreamReaderInfo, ranges, gidHash, rsp, ver));
11,009,582✔
1855
  }
1856

1857
  tEndDecode(&decoder);
11,010,904✔
1858

1859
  if (rsp->metaBlock != NULL){
11,011,220✔
1860
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
2,599,444✔
1861
    int32_t iter = 0;
2,599,458✔
1862
    void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
2,599,458✔
1863
    while (px != NULL) {
4,401,540✔
1864
      WalMetaResult* pMeta = (WalMetaResult*)px;
1,803,032✔
1865
      STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
1,803,032✔
1866
      ((SSDataBlock*)rsp->metaBlock)->info.rows++;
1,803,488✔
1867
      rsp->totalRows++;
1,803,488✔
1868
      ST_TASK_DLOG("%s process meta data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
1,803,728✔
1869
            ", ver:%"PRId64, __func__, pMeta->skey, pMeta->ekey, pMeta->id, ver);
1870
      px = tSimpleHashIterate(gidHash, px, &iter);
1,803,728✔
1871
    }
1872
  }
1873
  
1874

1875
end:
11,006,783✔
1876
  tSimpleHashCleanup(gidHash);
11,010,524✔
1877
  tDecoderClear(&decoder);
11,011,692✔
1878
  return code;
11,012,393✔
1879
}
1880

1881
static int32_t scanSubmitTbDataPre(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* ranges, 
13,058,041✔
1882
  uint64_t* gid, int64_t* uid, int32_t* numOfRows, SSTriggerWalNewRsp* rsp, int64_t ver) {
1883
  int32_t code = 0;
13,058,041✔
1884
  int32_t lino = 0;
13,058,041✔
1885
  void* pTask = sStreamReaderInfo->pTask;
13,058,041✔
1886

1887
  if (tStartDecode(pCoder) < 0) {
13,058,271✔
1888
    code = TSDB_CODE_INVALID_MSG;
×
1889
    TSDB_CHECK_CODE(code, lino, end);
×
1890
  }
1891

1892
  SSubmitTbData submitTbData = {0};
13,058,766✔
1893
  uint8_t       version = 0;
13,058,766✔
1894
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
13,058,970✔
1895
    code = TSDB_CODE_INVALID_MSG;
×
1896
    TSDB_CHECK_CODE(code, lino, end);
×
1897
  }
1898
  version = (submitTbData.flags >> 8) & 0xff;
13,058,970✔
1899
  submitTbData.flags = submitTbData.flags & 0xff;
13,058,970✔
1900

1901
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1902
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
13,058,970✔
1903
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
799,103✔
1904
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
799,337✔
1905
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
799,337✔
1906
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq, ver));
799,561✔
1907
  }
1908

1909
  // submit data
1910
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
13,058,964✔
1911
    code = TSDB_CODE_INVALID_MSG;
×
1912
    TSDB_CHECK_CODE(code, lino, end);
×
1913
  }
1914
  if (tDecodeI64(pCoder, uid) < 0) {
13,056,334✔
1915
    code = TSDB_CODE_INVALID_MSG;
×
1916
    TSDB_CHECK_CODE(code, lino, end);
×
1917
  }
1918
  ST_TASK_DLOG("%s uid:%" PRId64 ", suid:%" PRId64, __func__, *uid, submitTbData.suid);
13,056,334✔
1919
  STREAM_CHECK_CONDITION_GOTO(!uidInTableListSet(sStreamReaderInfo, submitTbData.suid, *uid, gid, rsp->isCalc), TDB_CODE_SUCCESS);
13,057,818✔
1920
  if (rsp->uidHash != NULL) {
10,215,745✔
1921
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(rsp->uidHash, uid, LONG_BYTES, gid, LONG_BYTES));
8,412,721✔
1922
    ST_TASK_DLOG("%s put uid into uidHash, uid:%" PRId64 ", suid:%" PRId64 " gid:%"PRIu64, __func__, *uid, submitTbData.suid, *gid);
8,412,517✔
1923
  }
1924
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
10,216,217✔
1925

1926
  if (ranges != NULL){
10,215,985✔
1927
    void* timerange = tSimpleHashGet(ranges, gid, sizeof(*gid));
8,412,721✔
1928
    if (timerange == NULL) goto end;;
8,412,486✔
1929
    int64_t* pRange = (int64_t*)timerange;
8,412,486✔
1930
    window.skey = pRange[0];
8,412,486✔
1931
    window.ekey = pRange[1];
8,412,486✔
1932
  }
1933
  
1934
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
10,215,052✔
1935
    code = TSDB_CODE_INVALID_MSG;
×
1936
    TSDB_CHECK_CODE(code, lino, end);
×
1937
  }
1938

1939
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
10,215,052✔
1940
    uint64_t nColData = 0;
×
1941
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1942
      code = TSDB_CODE_INVALID_MSG;
×
1943
      TSDB_CHECK_CODE(code, lino, end);
×
1944
    }
1945

1946
    SColData colData = {0};
×
1947
    code = tDecodeColData(version, pCoder, &colData, false);
×
1948
    if (code) {
×
1949
      code = TSDB_CODE_INVALID_MSG;
×
1950
      TSDB_CHECK_CODE(code, lino, end);
×
1951
    }
1952

1953
    if (colData.flag != HAS_VALUE) {
×
1954
      code = TSDB_CODE_INVALID_MSG;
×
1955
      TSDB_CHECK_CODE(code, lino, end);
×
1956
    }
1957
    int32_t rowStart = 0;
×
1958
    int32_t rowEnd = 0;
×
1959
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) {
×
1960
      STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, numOfRows));
×
1961
    } else {
1962
      (*numOfRows) = colData.nVal;
×
1963
    } 
1964
  } else {
1965
    uint64_t nRow = 0;
10,215,052✔
1966
    if (tDecodeU64v(pCoder, &nRow) < 0) {
10,214,350✔
1967
      code = TSDB_CODE_INVALID_MSG;
×
1968
      TSDB_CHECK_CODE(code, lino, end);
×
1969
    }
1970

1971
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) { 
10,214,350✔
1972
      for (uint64_t iRow = 0; iRow < nRow; ++iRow) {
17,877,143✔
1973
        SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
9,465,596✔
1974
        pCoder->pos += pRow->len;
9,465,830✔
1975
        if (pRow->ts < window.skey || pRow->ts > window.ekey) {
9,466,065✔
1976
          continue;
7,796✔
1977
        }
1978
        (*numOfRows)++;
9,458,269✔
1979
      }
1980
    } else {
1981
      (*numOfRows) = nRow;
1,803,032✔
1982
    }
1983
  }
1984
  
1985
end:
13,059,674✔
1986
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
13,058,744✔
1987
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
13,056,653✔
1988
  tEndDecode(pCoder);
13,056,421✔
1989
  return code;
13,056,876✔
1990
}
1991

1992
static int32_t scanSubmitDataPre(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp, int64_t ver) {
13,058,044✔
1993
  int32_t  code = 0;
13,058,044✔
1994
  int32_t  lino = 0;
13,058,044✔
1995
  SDecoder decoder = {0};
13,058,044✔
1996
  void* pTask = sStreamReaderInfo->pTask;
13,058,742✔
1997

1998
  tDecoderInit(&decoder, data, len);
13,058,742✔
1999
  if (tStartDecode(&decoder) < 0) {
13,056,658✔
2000
    code = TSDB_CODE_INVALID_MSG;
×
2001
    TSDB_CHECK_CODE(code, lino, end);
×
2002
  }
2003

2004
  uint64_t nSubmitTbData = 0;
13,058,032✔
2005
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
13,058,044✔
2006
    code = TSDB_CODE_INVALID_MSG;
×
2007
    TSDB_CHECK_CODE(code, lino, end);
×
2008
  }
2009
  ST_TASK_DLOG("%s nSubmitTbData:%" PRIu64 ", ver:%"PRId64 " bodyLen:%d", __func__, nSubmitTbData, ver, len);
13,058,044✔
2010

2011
  for (int32_t i = 0; i < nSubmitTbData; i++) {
26,116,080✔
2012
    uint64_t gid = -1;
13,057,822✔
2013
    int64_t  uid = 0;
13,057,582✔
2014
    int32_t numOfRows = 0;
13,058,046✔
2015
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataPre(&decoder, sStreamReaderInfo, ranges, &gid, &uid, &numOfRows, rsp, ver));
13,058,280✔
2016
    if (numOfRows <= 0) {
13,058,278✔
2017
      ST_TASK_DLOG("%s no valid data uid:%" PRId64 ", gid:%" PRIu64 ", numOfRows:%d, ver:%"PRId64, __func__, uid, gid, numOfRows, ver);
2,843,927✔
2018
      continue;
2,844,159✔
2019
    }
2020
    rsp->totalRows += numOfRows;
10,214,351✔
2021
    rsp->totalDataRows += numOfRows;
10,215,745✔
2022

2023
    SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(rsp->indexHash, &uid, LONG_BYTES);
10,215,747✔
2024
    if (pSlice != NULL) {
10,215,748✔
2025
      pSlice->numRows += numOfRows;
9,492,729✔
2026
      ST_TASK_DLOG("%s again uid:%" PRId64 ", gid:%" PRIu64 ", total numOfRows:%d, hash:%p %d, ver:%"PRId64, __func__, uid, gid, pSlice->numRows, rsp->indexHash, tSimpleHashGetSize(rsp->indexHash), ver);
9,492,963✔
2027
      pSlice->gId = gid;
9,492,480✔
2028
    } else {
2029
      SStreamWalDataSlice tmp = {.gId=gid,.numRows=numOfRows,.currentRowIdx=0,.startRowIdx=0};
723,019✔
2030
      ST_TASK_DLOG("%s first uid:%" PRId64 ", gid:%" PRIu64 ", numOfRows:%d, hash:%p %d, ver:%"PRId64, __func__, uid, gid, tmp.numRows, rsp->indexHash, tSimpleHashGetSize(rsp->indexHash), ver);
723,251✔
2031
      STREAM_CHECK_RET_GOTO(tSimpleHashPut(rsp->indexHash, &uid, LONG_BYTES, &tmp, sizeof(tmp)));
723,251✔
2032
    } 
2033
  }
2034

2035
  tEndDecode(&decoder);
13,058,258✔
2036

2037
end:
13,058,048✔
2038
  tDecoderClear(&decoder);
13,058,283✔
2039
  return code;
13,058,981✔
2040
}
2041

2042
static void buildIndexHash(SSHashObj* indexHash, void* pTask){
611,424✔
2043
  void*   pe = NULL;
611,424✔
2044
  int32_t iter = 0;
611,424✔
2045
  int32_t index = 0;
611,424✔
2046
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
1,334,675✔
2047
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
723,251✔
2048
    pInfo->startRowIdx = index;
723,251✔
2049
    pInfo->currentRowIdx = index;
723,251✔
2050
    index += pInfo->numRows;
723,251✔
2051
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
1,268,539✔
2052
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
2053
  }
2054
}
611,424✔
2055

2056
static void printIndexHash(SSHashObj* indexHash, void* pTask){
611,424✔
2057
  if (qDebugFlag & DEBUG_TRACE) {
611,424✔
2058
    void*   pe = NULL;
2,467✔
2059
    int32_t iter = 0;
2,467✔
2060
    while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
5,366✔
2061
      SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
2,899✔
2062
      ST_TASK_TLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
3,119✔
2063
      pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
2064
    }
2065
  }
2066
}
611,424✔
2067

2068
static void filterIndexHash(SSHashObj* indexHash, SColumnInfoData* pRet){
11,757✔
2069
  void*   pe = NULL;
11,757✔
2070
  int32_t iter = 0;
11,757✔
2071
  int32_t index = 0;
11,757✔
2072
  int32_t pIndex = 0;
11,757✔
2073
  int8_t* pIndicator = (int8_t*)pRet->pData;
11,757✔
2074
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
25,114✔
2075
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
13,357✔
2076
    pInfo->startRowIdx = index;
13,357✔
2077
    int32_t size = pInfo->numRows;
13,357✔
2078
    for (int32_t i = 0; i < pInfo->numRows; i++) {
198,981✔
2079
      if (pIndicator && !pIndicator[pIndex++]) {
185,624✔
2080
        size--;
64,023✔
2081
      }
2082
    }
2083
    pInfo->numRows = size;
13,357✔
2084
    index += pInfo->numRows;
13,357✔
2085
    stTrace("stream reader re build index hash uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
13,357✔
2086
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
2087
  }
2088
}
11,757✔
2089

2090
static int32_t prepareIndexMetaData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* resultRsp){
3,367,443✔
2091
  int32_t      code = 0;
3,367,443✔
2092
  int32_t      lino = 0;
3,367,443✔
2093
  void* pTask = sStreamReaderInfo->pTask;
3,367,443✔
2094

2095
  code = walReaderSeekVer(pWalReader, resultRsp->ver);
3,368,113✔
2096
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
3,366,597✔
2097
    if (resultRsp->ver < walGetFirstVer(pWalReader->pWal)) {
2,909,471✔
2098
      resultRsp->ver = walGetFirstVer(pWalReader->pWal);
×
2099
      resultRsp->verTime = 0;
×
2100
    } else {
2101
      resultRsp->verTime = taosGetTimestampUs();
2,908,546✔
2102
    }
2103
    ST_TASK_DLOG("%s scan wal end:%s",  __func__, tstrerror(code));
2,908,636✔
2104
    code = TSDB_CODE_SUCCESS;
2,909,239✔
2105
    goto end;
2,909,239✔
2106
  }
2107
  STREAM_CHECK_RET_GOTO(code);
457,126✔
2108

2109
  while (1) {
4,757,963✔
2110
    code = walNextValidMsg(pWalReader, true);
5,215,089✔
2111
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
5,212,806✔
2112
      resultRsp->verTime = taosGetTimestampUs();
457,598✔
2113
      ST_TASK_DLOG("%s scan wal end:%s", __func__, tstrerror(code));
457,598✔
2114
      code = TSDB_CODE_SUCCESS;
457,598✔
2115
      goto end;
457,598✔
2116
    }
2117
    STREAM_CHECK_RET_GOTO(code);
4,755,904✔
2118
    resultRsp->ver = pWalReader->curVersion;
4,755,904✔
2119
    SWalCont* wCont = &pWalReader->pHead->head;
4,756,376✔
2120
    resultRsp->verTime = wCont->ingestTs;
4,755,568✔
2121
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
4,755,682✔
2122
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
4,755,304✔
2123
    int64_t ver = wCont->version;
4,755,690✔
2124
    ST_TASK_DLOG("%s scan wal ver:%" PRId64 ", type:%s, deleteData:%d, deleteTb:%d, msg len:%d", __func__,
4,754,384✔
2125
      ver, TMSG_INFO(wCont->msgType), sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl, len);
2126
    if (wCont->msgType == TDMT_VND_SUBMIT) {
4,755,942✔
2127
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
4,645,323✔
2128
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
4,645,553✔
2129
      STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, data, len, NULL, resultRsp, ver));
4,645,555✔
2130
    } else {
2131
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, resultRsp, ver));
111,236✔
2132
    }
2133

2134
    ST_TASK_DLOG("%s scan wal next ver:%" PRId64 ", totalRows:%d", __func__, resultRsp->ver, resultRsp->totalRows);
4,757,727✔
2135
    if (resultRsp->totalRows >= STREAM_RETURN_ROWS_NUM || resultRsp->needReturn) {
4,757,727✔
2136
      break;
2137
    }
2138
  }
2139
  
UNCOV
2140
end:
×
2141
  STREAM_PRINT_LOG_END(code, lino);
3,366,371✔
2142
  return code;
3,366,373✔
2143
}
2144

2145
static int32_t prepareIndexData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, 
8,919,938✔
2146
  SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp){
2147
  int32_t      code = 0;
8,919,938✔
2148
  int32_t      lino = 0;
8,919,938✔
2149
  void* pTask = sStreamReaderInfo->pTask;
8,919,938✔
2150

2151
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
17,333,927✔
2152
    int64_t *ver = taosArrayGet(versions, i);
8,412,251✔
2153
    if (ver == NULL) continue;
8,412,257✔
2154

2155
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
8,412,257✔
2156
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
8,412,721✔
2157
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
2158
      ST_TASK_TLOG("%s not data, skip, ver:%"PRId64, __func__, *ver);
×
2159
      continue;
×
2160
    }
2161
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
8,412,721✔
2162

2163
    SWalCont* wCont = &pWalReader->pHead->head;
8,412,285✔
2164
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
8,412,517✔
2165
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
8,412,721✔
2166

2167
    STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, pBody, bodyLen, ranges, rsp, *ver));
8,412,489✔
2168
  }
2169
  
2170
end:
8,919,474✔
2171
  return code;
8,919,474✔
2172
}
2173

2174
static int32_t filterData(SSTriggerWalNewRsp* resultRsp, SStreamTriggerReaderInfo* sStreamReaderInfo) {
611,424✔
2175
  int32_t      code = 0;
611,424✔
2176
  int32_t       lino = 0;
611,424✔
2177
  SColumnInfoData* pRet = NULL;
611,424✔
2178

2179
  int64_t totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
611,424✔
2180
  STREAM_CHECK_RET_GOTO(qStreamFilter(((SSDataBlock*)resultRsp->dataBlock), sStreamReaderInfo->pFilterInfo, &pRet));
611,424✔
2181

2182
  if (((SSDataBlock*)resultRsp->dataBlock)->info.rows < totalRows) {
611,424✔
2183
    filterIndexHash(resultRsp->indexHash, pRet);
11,757✔
2184
  }
2185

2186
end:
611,424✔
2187
  colDataDestroy(pRet);
611,424✔
2188
  taosMemoryFree(pRet);
611,424✔
2189
  return code;
611,188✔
2190
}
2191

2192
static int32_t processWalVerMetaDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
3,366,341✔
2193
                                    SSTriggerWalNewRsp* resultRsp) {
2194
  int32_t      code = 0;
3,366,341✔
2195
  int32_t      lino = 0;
3,366,341✔
2196
  void* pTask = sStreamReaderInfo->pTask;
3,366,341✔
2197
                                        
2198
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
3,365,517✔
2199
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
3,365,512✔
2200
  blockDataEmpty(resultRsp->dataBlock);
3,365,512✔
2201
  blockDataEmpty(resultRsp->metaBlock);
3,362,814✔
2202
  int64_t lastVer = resultRsp->ver;                                      
3,362,867✔
2203
  STREAM_CHECK_RET_GOTO(prepareIndexMetaData(pWalReader, sStreamReaderInfo, resultRsp));
3,363,453✔
2204
  STREAM_CHECK_CONDITION_GOTO(resultRsp->totalRows == 0, TDB_CODE_SUCCESS);
3,365,207✔
2205

2206
  buildIndexHash(resultRsp->indexHash, pTask);
126,982✔
2207
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(((SSDataBlock*)resultRsp->dataBlock), resultRsp->totalRows));
126,982✔
2208
  while(lastVer < resultRsp->ver) {
2,769,854✔
2209
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, lastVer++));
2,643,606✔
2210
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
2,642,923✔
2211
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
44,417✔
2212
      continue;
44,417✔
2213
    }
2214
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
2,598,740✔
2215
    SWalCont* wCont = &pWalReader->pHead->head;
2,599,214✔
2216
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
2,599,446✔
2217
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
2,599,446✔
2218
    ST_TASK_DLOG("process wal ver:%" PRId64 ", type:%d, bodyLen:%d", wCont->version, wCont->msgType, bodyLen);
2,599,446✔
2219
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, NULL, resultRsp, wCont->version));
2,599,678✔
2220
  }
2221

2222
  int32_t metaRows = resultRsp->totalRows - ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
126,982✔
2223
  STREAM_CHECK_RET_GOTO(filterData(resultRsp, sStreamReaderInfo));
126,982✔
2224
  resultRsp->totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows + metaRows;
126,746✔
2225

2226
end:
3,365,905✔
2227
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
3,365,905✔
2228
          resultRsp->totalRows, resultRsp->ver, walGetAppliedVer(pWalReader->pWal));
2229
  walCloseReader(pWalReader);
3,365,905✔
2230
  return code;
3,364,981✔
2231
}
2232

2233
static int32_t processWalVerDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
8,920,937✔
2234
                                    SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
2235
  int32_t      code = 0;
8,920,937✔
2236
  int32_t      lino = 0;
8,920,937✔
2237

2238
  void* pTask = sStreamReaderInfo->pTask;
8,920,937✔
2239
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
8,921,905✔
2240
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
8,921,905✔
2241
  
2242
  if (taosArrayGetSize(versions) > 0) {
8,921,905✔
2243
    rsp->ver = *(int64_t*)taosArrayGetLast(versions);
484,442✔
2244
  }
2245
  
2246
  STREAM_CHECK_RET_GOTO(prepareIndexData(pWalReader, sStreamReaderInfo, versions, ranges, rsp));
8,921,905✔
2247
  STREAM_CHECK_CONDITION_GOTO(rsp->totalRows == 0, TDB_CODE_SUCCESS);
8,919,474✔
2248

2249
  ST_TASK_TLOG("%s index hash:%p %d", __func__, rsp->indexHash, tSimpleHashGetSize(rsp->indexHash));
484,442✔
2250
  buildIndexHash(rsp->indexHash, pTask);
484,442✔
2251

2252
  blockDataEmpty(rsp->dataBlock);
484,442✔
2253
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dataBlock, rsp->totalRows));
484,442✔
2254

2255
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
8,896,693✔
2256
    int64_t *ver = taosArrayGet(versions, i);
8,412,486✔
2257
    if (ver == NULL) continue;
8,412,254✔
2258
    ST_TASK_TLOG("vgId:%d %s scan wal process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, *ver, walGetAppliedVer(pWalReader->pWal));
8,412,254✔
2259

2260
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
8,412,254✔
2261
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
8,412,019✔
2262
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
2263
      continue;
×
2264
    }
2265
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
8,412,254✔
2266
    SWalCont* wCont = &pWalReader->pHead->head;
8,412,486✔
2267
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
8,412,486✔
2268
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
8,412,486✔
2269

2270
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, ranges, rsp, wCont->version));
8,412,486✔
2271
  }
2272
  // printDataBlock(rsp->dataBlock, __func__, "processWalVerDataNew");
2273
  STREAM_CHECK_RET_GOTO(filterData(rsp, sStreamReaderInfo));
484,207✔
2274
  rsp->totalRows = ((SSDataBlock*)rsp->dataBlock)->info.rows;
484,442✔
2275

2276
end:
8,920,906✔
2277
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
8,920,906✔
2278
            rsp->totalRows, rsp->ver, walGetAppliedVer(pWalReader->pWal));
2279
  walCloseReader(pWalReader);
8,920,906✔
2280
  return code;
8,921,673✔
2281
}
2282

2283
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas, SStorageAPI* api) {
491,320✔
2284
  int32_t code = 0;
491,320✔
2285
  int32_t lino = 0;
491,320✔
2286
  SMetaReader metaReader = {0};
491,320✔
2287
  *schemas = taosArrayInit(8, sizeof(SSchema));
491,320✔
2288
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
491,320✔
2289
  
2290
  api->metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api->metaFn);
491,320✔
2291
  STREAM_CHECK_RET_GOTO(api->metaReaderFn.getTableEntryByUid(&metaReader, uid));
491,320✔
2292

2293
  SSchemaWrapper* sSchemaWrapper = NULL;
489,028✔
2294
  if (metaReader.me.type == TD_CHILD_TABLE) {
489,028✔
2295
    int64_t suid = metaReader.me.ctbEntry.suid;
489,028✔
2296
    tDecoderClear(&metaReader.coder);
489,028✔
2297
    STREAM_CHECK_RET_GOTO(api->metaReaderFn.getTableEntryByUid(&metaReader, suid));
489,028✔
2298
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
489,028✔
2299
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
×
2300
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
×
2301
  } else {
2302
    qError("invalid table type:%d", metaReader.me.type);
×
2303
  }
2304

2305
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
2,898,080✔
2306
    SSchema* s = sSchemaWrapper->pSchema + j;
2,409,052✔
2307
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
4,818,104✔
2308
  }
2309

2310
end:
491,320✔
2311
  api->metaReaderFn.clearReader(&metaReader);
491,320✔
2312
  STREAM_PRINT_LOG_END(code, lino);
491,320✔
2313
  if (code != 0)  {
491,320✔
2314
    taosArrayDestroy(*schemas);
2,292✔
2315
    *schemas = NULL;
2,292✔
2316
  }
2317
  return code;
491,320✔
2318
}
2319

2320
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
489,028✔
2321
  int32_t code = 0;
489,028✔
2322
  int32_t lino = 0;
489,028✔
2323
  size_t  schemaLen = taosArrayGetSize(schemas);
489,028✔
2324
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
489,028✔
2325
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1,916,448✔
2326
    col_id_t* id = taosArrayGet(cols, i);
1,427,420✔
2327
    STREAM_CHECK_NULL_GOTO(id, terrno);
1,427,420✔
2328
    for (size_t i = 0; i < schemaLen; i++) {
3,753,960✔
2329
      SSchema* s = taosArrayGet(schemas, i);
3,753,960✔
2330
      STREAM_CHECK_NULL_GOTO(s, terrno);
3,753,960✔
2331
      if (*id == s->colId) {
3,753,960✔
2332
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
1,427,420✔
2333
        break;
1,427,420✔
2334
      }
2335
    }
2336
  }
2337
  taosArrayPopFrontBatch(schemas, schemaLen);
489,028✔
2338

2339
end:
489,028✔
2340
  return code;
489,028✔
2341
}
2342

2343
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
×
2344
                                    STargetNode* pTargetNodeTs) {
2345
  int32_t code = 0;
×
2346
  int32_t lino = 0;
×
2347

2348
  SColumnNode*         pCol = NULL;
×
2349
  SColumnNode*         pCol1 = NULL;
×
2350
  SValueNode*          pVal = NULL;
×
2351
  SValueNode*          pVal1 = NULL;
×
2352
  SOperatorNode*       op = NULL;
×
2353
  SOperatorNode*       op1 = NULL;
×
2354
  SLogicConditionNode* cond = NULL;
×
2355

2356
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
×
2357
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
×
2358
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
×
2359
  pCol->node.resType.bytes = LONG_BYTES;
×
2360
  pCol->slotId = pTargetNodeTs->slotId;
×
2361
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
×
2362

2363
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
×
2364

2365
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
×
2366
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
×
2367
  pVal->node.resType.bytes = LONG_BYTES;
×
2368
  pVal->datum.i = start;
×
2369
  pVal->typeData = start;
×
2370

2371
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
×
2372
  pVal1->datum.i = end;
×
2373
  pVal1->typeData = end;
×
2374

2375
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
×
2376
  op->opType = OP_TYPE_GREATER_EQUAL;
×
2377
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
2378
  op->node.resType.bytes = CHAR_BYTES;
×
2379
  op->pLeft = (SNode*)pCol;
×
2380
  op->pRight = (SNode*)pVal;
×
2381
  pCol = NULL;
×
2382
  pVal = NULL;
×
2383

2384
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
×
2385
  op1->opType = OP_TYPE_LOWER_EQUAL;
×
2386
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
2387
  op1->node.resType.bytes = CHAR_BYTES;
×
2388
  op1->pLeft = (SNode*)pCol1;
×
2389
  op1->pRight = (SNode*)pVal1;
×
2390
  pCol1 = NULL;
×
2391
  pVal1 = NULL;
×
2392

2393
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
×
2394
  cond->condType = LOGIC_COND_TYPE_AND;
×
2395
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
2396
  cond->node.resType.bytes = CHAR_BYTES;
×
2397
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
×
2398
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
×
2399
  op = NULL;
×
2400
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
×
2401
  op1 = NULL;
×
2402

2403
  *pCond = cond;
×
2404

2405
end:
×
2406
  if (code != 0) {
×
2407
    nodesDestroyNode((SNode*)pCol);
×
2408
    nodesDestroyNode((SNode*)pCol1);
×
2409
    nodesDestroyNode((SNode*)pVal);
×
2410
    nodesDestroyNode((SNode*)pVal1);
×
2411
    nodesDestroyNode((SNode*)op);
×
2412
    nodesDestroyNode((SNode*)op1);
×
2413
    nodesDestroyNode((SNode*)cond);
×
2414
  }
2415
  STREAM_PRINT_LOG_END(code, lino);
×
2416

2417
  return code;
×
2418
}
2419

2420
/*
2421
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
2422
  int32_t              code = 0;
2423
  int32_t              lino = 0;
2424
  SLogicConditionNode* pAndCondition = NULL;
2425
  SLogicConditionNode* cond = NULL;
2426

2427
  if (pTargetNodeTs == NULL) {
2428
    vError("stream reader %s no ts column", __func__);
2429
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
2430
  }
2431
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
2432
  cond->condType = LOGIC_COND_TYPE_OR;
2433
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
2434
  cond->node.resType.bytes = CHAR_BYTES;
2435
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
2436

2437
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
2438
    data->curIdx = i;
2439

2440
    SReadHandle handle = {0};
2441
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
2442
    if (!handle.winRangeValid) {
2443
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
2444
              handle.winRange.ekey);
2445
      continue;
2446
    }
2447
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
2448
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
2449
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
2450
    pAndCondition = NULL;
2451
  }
2452

2453
  *pCond = cond;
2454

2455
end:
2456
  if (code != 0) {
2457
    nodesDestroyNode((SNode*)pAndCondition);
2458
    nodesDestroyNode((SNode*)cond);
2459
  }
2460
  STREAM_PRINT_LOG_END(code, lino);
2461

2462
  return code;
2463
}
2464
*/
2465

2466
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
503,689✔
2467
                                    STimeRangeNode* node, SReadHandle* handle, bool isExtWin) {
2468
  int32_t code = 0;
503,689✔
2469
  int32_t lino = 0;
503,689✔
2470
  void* pTask = sStreamReaderCalcInfo->pTask;
503,689✔
2471
  STimeWindow* pWin = isExtWin ? &handle->extWinRange : &handle->winRange;
503,689✔
2472
  bool* pValid = isExtWin ? &handle->extWinRangeValid : &handle->winRangeValid;
503,689✔
2473
  
2474
  if (req->pStRtFuncInfo->withExternalWindow) {
503,689✔
2475
    sStreamReaderCalcInfo->tmpRtFuncInfo.curIdx = 0;
381,721✔
2476
    sStreamReaderCalcInfo->tmpRtFuncInfo.triggerType = req->pStRtFuncInfo->triggerType;
381,721✔
2477
    sStreamReaderCalcInfo->tmpRtFuncInfo.isWindowTrigger = req->pStRtFuncInfo->isWindowTrigger;
381,721✔
2478
    sStreamReaderCalcInfo->tmpRtFuncInfo.precision = req->pStRtFuncInfo->precision;
381,721✔
2479

2480
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
381,599✔
2481
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
381,721✔
2482
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
381,721✔
2483
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
381,721✔
2484

2485
    if (!node->needCalc) {
381,721✔
2486
      pWin->skey = pFirst->wstart;
272,084✔
2487
      pWin->ekey = pLast->wend;
272,084✔
2488
      *pValid = true;
272,084✔
2489
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
272,084✔
2490
        pWin->ekey--;
165,309✔
2491
      }
2492
    } else {
2493
      SSTriggerCalcParam* pTmp = taosArrayGet(sStreamReaderCalcInfo->tmpRtFuncInfo.pStreamPesudoFuncVals, 0);
109,637✔
2494
      memcpy(pTmp, pFirst, sizeof(*pTmp));
109,637✔
2495

2496
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 1));
109,637✔
2497
      if (*pValid) {
109,637✔
2498
        int64_t skey = pWin->skey;
109,637✔
2499

2500
        memcpy(pTmp, pLast, sizeof(*pTmp));
109,637✔
2501
        STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 2));
109,637✔
2502

2503
        if (*pValid) {
109,637✔
2504
          pWin->skey = skey;
109,637✔
2505
        }
2506
      }
2507
      pWin->ekey--;
109,637✔
2508
    }
2509
  } else {
2510
    if (!node->needCalc) {
121,968✔
2511
      SSTriggerCalcParam* pCurr = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, req->pStRtFuncInfo->curIdx);
79,032✔
2512
      pWin->skey = pCurr->wstart;
79,032✔
2513
      pWin->ekey = pCurr->wend;
79,032✔
2514
      *pValid = true;
79,032✔
2515
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
79,032✔
2516
        pWin->ekey--;
31,612✔
2517
      }
2518
    } else {
2519
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, req->pStRtFuncInfo, pWin, pValid, 3));
42,936✔
2520
      pWin->ekey--;
42,936✔
2521
    }
2522
  }
2523

2524
  ST_TASK_DLOG("%s type:%s, withExternalWindow:%d, skey:%" PRId64 ", ekey:%" PRId64 ", validRange:%d", 
503,567✔
2525
      __func__, isExtWin ? "interp range" : "scan time range", req->pStRtFuncInfo->withExternalWindow, pWin->skey, pWin->ekey, *pValid);
2526

2527
end:
6,064✔
2528

2529
  if (code) {
503,689✔
2530
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2531
  }
2532
  
2533
  return code;
503,689✔
2534
}
2535

2536
static int32_t createDataBlockTsUid(SSDataBlock** pBlockRet, uint32_t numOfRows) {
477,770✔
2537
  int32_t      code = 0;
477,770✔
2538
  int32_t      lino = 0;
477,770✔
2539
  SSDataBlock* pBlock = NULL;
477,770✔
2540
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
478,182✔
2541
  SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID);
478,416✔
2542
  STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
478,416✔
2543
  idata = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, LONG_BYTES, PRIMARYKEY_TIMESTAMP_COL_ID + 1);
477,959✔
2544
  STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
478,416✔
2545
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, numOfRows));
478,650✔
2546

2547
end:
478,650✔
2548
  STREAM_PRINT_LOG_END(code, lino)
478,650✔
2549
  if (code != TSDB_CODE_SUCCESS) {
478,416✔
2550
    blockDataDestroy(pBlock);
×
2551
    pBlock = NULL;
×
2552
  }
2553
  *pBlockRet = pBlock;
478,416✔
2554
  return code;
478,416✔
2555
}
2556

2557
static int32_t processTsOutPutAllTables(SStreamTriggerReaderInfo* sStreamReaderInfo, SStreamTsResponse* tsRsp, SSDataBlock* pResBlock, int32_t order) {
294,623✔
2558
  int32_t code = 0;
294,623✔
2559
  int32_t lino = 0;
294,623✔
2560
  void* pTask = sStreamReaderInfo->pTask;
294,623✔
2561

2562
  tsRsp->tsInfo = taosArrayInit(pResBlock->info.rows, sizeof(STsInfo));
294,623✔
2563
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
294,623✔
2564
  SColumnInfoData* pColInfoDataTs = taosArrayGet(pResBlock->pDataBlock, 0);
294,623✔
2565
  SColumnInfoData* pColInfoDataUid = taosArrayGet(pResBlock->pDataBlock, 1);
294,623✔
2566
  for (int32_t j = 0; j < pResBlock->info.rows; j++) {
829,112✔
2567
    if (colDataIsNull_s(pColInfoDataTs, j) || pColInfoDataTs->pData == NULL) {
1,068,978✔
2568
      continue;
×
2569
    }
2570
    STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
534,294✔
2571
    STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
534,489✔
2572
    if (order == TSDB_ORDER_ASC) {
534,489✔
2573
      tsInfo->ts = INT64_MAX;
277,042✔
2574
    } else {
2575
      tsInfo->ts = INT64_MIN;
257,447✔
2576
    }
2577
    int64_t ts = *(int64_t*)colDataGetNumData(pColInfoDataTs, j);
534,489✔
2578
    if (order == TSDB_ORDER_ASC && ts < tsInfo->ts) {
534,489✔
2579
      tsInfo->ts = ts;
277,042✔
2580
    } else if (order == TSDB_ORDER_DESC && ts > tsInfo->ts) {
257,447✔
2581
      tsInfo->ts = ts;
257,447✔
2582
    }
2583
    tsInfo->gId = *(int64_t*)colDataGetNumData(pColInfoDataUid, j);
534,489✔
2584
    ST_TASK_DLOG("%s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, __func__, tsInfo->ts, tsInfo->gId, tsRsp->ver);
534,489✔
2585
  }
2586

2587
end:
294,623✔
2588
  return code;
294,623✔
2589
}
2590

2591
static int32_t processTsOutPutOneGroup(SStreamTriggerReaderInfo* sStreamReaderInfo, SStreamTsResponse* tsRsp, SSDataBlock* pResBlock, int32_t order) {
85,644✔
2592
  int32_t code = 0;
85,644✔
2593
  int32_t lino = 0;
85,644✔
2594
  void* pTask = sStreamReaderInfo->pTask;
85,644✔
2595

2596
  tsRsp->tsInfo = taosArrayInit(1, sizeof(STsInfo));
85,646✔
2597
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
85,646✔
2598
  STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
85,412✔
2599
  STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
85,644✔
2600
  if (order == TSDB_ORDER_ASC) {
85,644✔
2601
    tsInfo->ts = INT64_MAX;
67,248✔
2602
  } else {
2603
    tsInfo->ts = INT64_MIN;
18,396✔
2604
  }
2605

2606
  SColumnInfoData* pColInfoDataTs = taosArrayGet(pResBlock->pDataBlock, 0);
85,878✔
2607
  SColumnInfoData* pColInfoDataUid = taosArrayGet(pResBlock->pDataBlock, 1);
85,412✔
2608
  for (int32_t j = 0; j < pResBlock->info.rows; j++) {
186,522✔
2609
    if (colDataIsNull_s(pColInfoDataTs, j) || pColInfoDataTs->pData == NULL) {
203,850✔
2610
      continue;
×
2611
    }
2612
    int64_t ts = *(int64_t*)colDataGetNumData(pColInfoDataTs, j);
101,574✔
2613
    if (order == TSDB_ORDER_ASC && ts < tsInfo->ts) {
101,576✔
2614
      tsInfo->ts = ts;
67,016✔
2615
    } else if (order == TSDB_ORDER_DESC && ts > tsInfo->ts) {
35,026✔
2616
      tsInfo->ts = ts;
18,396✔
2617
    }
2618
  }
2619
  int64_t uid = *(int64_t*)colDataGetNumData(pColInfoDataUid, 0);
84,712✔
2620
  tsInfo->gId = qStreamGetGroupIdFromSet(sStreamReaderInfo, uid);
85,878✔
2621
  ST_TASK_DLOG("%s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, __func__, tsInfo->ts, tsInfo->gId, tsRsp->ver);
85,878✔
2622

2623
end:
30,466✔
2624
  return code;
85,644✔
2625
}
2626

2627
static int32_t processTsOutPutAllGroups(SStreamTriggerReaderInfo* sStreamReaderInfo, SStreamTsResponse* tsRsp, SSDataBlock* pResBlock, int32_t order) {
7,427✔
2628
  int32_t code = 0;
7,427✔
2629
  int32_t lino = 0;
7,427✔
2630
  STableKeyInfo* pList = NULL;
7,427✔
2631
  StreamTableListInfo     tableInfo = {0};
7,427✔
2632

2633
  void* pTask = sStreamReaderInfo->pTask;
7,427✔
2634
  STREAM_CHECK_RET_GOTO(qStreamCopyTableInfo(sStreamReaderInfo, &tableInfo));
7,238✔
2635

2636
  SSHashObj*   uidTsHash = tSimpleHashInit(pResBlock->info.rows, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
7,427✔
2637
  STREAM_CHECK_NULL_GOTO(uidTsHash, terrno);
7,427✔
2638
  SColumnInfoData* pColInfoDataTs = taosArrayGet(pResBlock->pDataBlock, 0);
7,427✔
2639
  SColumnInfoData* pColInfoDataUid = taosArrayGet(pResBlock->pDataBlock, 1);
7,427✔
2640
  for (int32_t j = 0; j < pResBlock->info.rows; j++) {
22,978✔
2641
    if (colDataIsNull_s(pColInfoDataTs, j) || pColInfoDataTs->pData == NULL) {
31,102✔
2642
      continue;
×
2643
    }
2644
    int64_t ts = *(int64_t*)colDataGetNumData(pColInfoDataTs, j);
15,551✔
2645
    int64_t uid = *(int64_t*)colDataGetNumData(pColInfoDataUid, j);
15,551✔
2646
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(uidTsHash, &uid, LONG_BYTES, &ts, LONG_BYTES));
15,551✔
2647
  }
2648
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(sStreamReaderInfo), sizeof(STsInfo));
7,427✔
2649
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
7,427✔
2650
  while (true) {
15,143✔
2651
    int32_t        pNum = 0;
22,570✔
2652
    int64_t        suid = 0;
22,570✔
2653
    STREAM_CHECK_RET_GOTO(qStreamIterTableList(&tableInfo, &pList, &pNum, &suid));
22,570✔
2654
    if(pNum == 0) break;
22,570✔
2655
    STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
15,726✔
2656
    STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
15,726✔
2657
    if (order == TSDB_ORDER_ASC) {
15,726✔
2658
      tsInfo->ts = INT64_MAX;
7,382✔
2659
    } else {
2660
      tsInfo->ts = INT64_MIN;
8,344✔
2661
    }
2662
    for (int32_t i = 0; i < pNum; i++) {
30,869✔
2663
      int64_t uid = pList[i].uid;
15,726✔
2664
      int64_t *ts = tSimpleHashGet(uidTsHash, &uid, LONG_BYTES);
15,726✔
2665
      STREAM_CHECK_NULL_GOTO(ts, terrno);
15,726✔
2666
      if (order == TSDB_ORDER_ASC && *ts < tsInfo->ts) {
15,143✔
2667
        tsInfo->ts = *ts;
7,382✔
2668
      } else if (order == TSDB_ORDER_DESC && *ts > tsInfo->ts) {
7,761✔
2669
        tsInfo->ts = *ts;
7,761✔
2670
      }
2671
    }
2672
    int64_t uid = pList[0].uid;
15,143✔
2673
    tsInfo->gId = qStreamGetGroupIdFromSet(sStreamReaderInfo, uid);
15,143✔
2674
    ST_TASK_DLOG("%s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, __func__, tsInfo->ts, tsInfo->gId, tsRsp->ver);
15,143✔
2675
    taosMemoryFreeClear(pList);
15,143✔
2676
  }
2677

2678
end:
7,427✔
2679
  qStreamDestroyTableInfo(&tableInfo);
7,427✔
2680
  taosMemoryFreeClear(pList);
7,427✔
2681
  tSimpleHashCleanup(uidTsHash);
7,427✔
2682
  return code;
7,427✔
2683
}
2684

2685
// static bool stReaderTaskWaitQuit(SStreamTask* pTask) { return taosHasRWWFlag(&pTask->entryLock); }
2686

2687
static int32_t getAllTs(SVnode* pVnode, SSDataBlock*  pResBlock, SStreamReaderTaskInner* pTaskInner, STableKeyInfo* pList, int32_t pNum) {
365,106✔
2688
  int32_t code = 0;
365,106✔
2689
  int32_t lino = 0;
365,106✔
2690

2691
  STREAM_CHECK_RET_GOTO(pTaskInner->storageApi->tsdReader.tsdCreateFirstLastTsIter(pVnode, &pTaskInner->options->twindows, &(SVersionRange){.minVer = -1, .maxVer = pTaskInner->options->ver},
365,106✔
2692
                                                pTaskInner->options->suid, pList, pNum, pTaskInner->options->order, &pTaskInner->pReader, pTaskInner->idStr));
2693
  bool hasNext = true;
365,340✔
2694
  while(1){
2695
    STREAM_CHECK_RET_GOTO(pTaskInner->storageApi->tsdReader.tsdNextFirstLastTsBlock(pTaskInner->pReader, pResBlock, &hasNext));
365,340✔
2696
    STREAM_CHECK_CONDITION_GOTO(!hasNext, TDB_CODE_SUCCESS);
365,106✔
2697
  }
2698

2699
end:
365,106✔
2700
  pTaskInner->storageApi->tsdReader.tsdDestroyFirstLastTsIter(pTaskInner->pReader);
365,340✔
2701
  pTaskInner->pReader = NULL;
364,624✔
2702
  return code;
364,624✔
2703
}
2704

2705
static int32_t processTsVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
243,686✔
2706
                                  SStreamReaderTaskInner* pTaskInner) {
2707
  int32_t code = 0;
243,686✔
2708
  int32_t lino = 0;
243,686✔
2709
  STableKeyInfo* pList = NULL;
243,686✔
2710
  StreamTableListInfo     tableInfo = {0};
243,686✔
2711

2712
  void* pTask = sStreamReaderInfo->pTask;
243,686✔
2713
  STREAM_CHECK_RET_GOTO(qStreamCopyTableInfo(sStreamReaderInfo, &tableInfo));
243,686✔
2714

2715
  SSDataBlock*  pResBlock = NULL;
243,686✔
2716
  STREAM_CHECK_RET_GOTO(createDataBlockTsUid(&pResBlock, qStreamGetTableListNum(sStreamReaderInfo)));
243,686✔
2717

2718
  while (true) {
130,144✔
2719
    int32_t        pNum = 0;
373,830✔
2720
    int64_t        suid = 0;
373,830✔
2721
    STREAM_CHECK_RET_GOTO(qStreamIterTableList(&tableInfo, &pList, &pNum, &suid));
373,830✔
2722
    if(pNum == 0) break;
374,062✔
2723
    pTaskInner->options->suid = suid;
130,376✔
2724
    STREAM_CHECK_RET_GOTO(getAllTs(pVnode, pResBlock, pTaskInner, pList, pNum));
130,376✔
2725
    taosMemoryFreeClear(pList);
130,376✔
2726
  }
2727

2728
  STREAM_CHECK_RET_GOTO(processTsOutPutAllTables(sStreamReaderInfo, tsRsp, pResBlock, pTaskInner->options->order));
243,686✔
2729

2730
end:
243,454✔
2731
  qStreamDestroyTableInfo(&tableInfo);
243,686✔
2732
  taosMemoryFreeClear(pList);
243,686✔
2733
  blockDataDestroy(pResBlock);
243,686✔
2734
  STREAM_PRINT_LOG_END_WITHID(code, lino);
243,451✔
2735
  return code;
243,451✔
2736
}
2737

2738
static int32_t processTsNonVTable(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
226,125✔
2739
                                  SStreamReaderTaskInner* pTaskInner) {
2740
  int32_t code = 0;
226,125✔
2741
  int32_t lino = 0;
226,125✔
2742
  STableKeyInfo* pList = NULL;
226,125✔
2743
  void* pTask = sStreamReaderInfo->pTask;
226,125✔
2744
  
2745
  SSDataBlock*  pResBlock = NULL;
226,125✔
2746

2747
  int32_t        pNum = 0;
226,125✔
2748
  int64_t        suid = 0;
226,125✔
2749
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, 0, &pList, &pNum));
226,125✔
2750
  STREAM_CHECK_CONDITION_GOTO(pNum == 0, TSDB_CODE_SUCCESS);
226,125✔
2751
  STREAM_CHECK_RET_GOTO(createDataBlockTsUid(&pResBlock, pNum));
193,926✔
2752

2753
  pTaskInner->options->suid = sStreamReaderInfo->suid;
193,926✔
2754
  STREAM_CHECK_RET_GOTO(getAllTs(pVnode, pResBlock, pTaskInner, pList, pNum));
193,926✔
2755
  STREAM_CHECK_CONDITION_GOTO(pResBlock->info.rows == 0, TDB_CODE_SUCCESS);
193,678✔
2756
  int32_t order = pTaskInner->options->order;
103,204✔
2757

2758
  if (sStreamReaderInfo->groupByTbname) {
103,204✔
2759
    STREAM_CHECK_RET_GOTO(processTsOutPutAllTables(sStreamReaderInfo, tsRsp, pResBlock, order));
50,689✔
2760
  } else if (sStreamReaderInfo->partitionCols == NULL) {
52,078✔
2761
    STREAM_CHECK_RET_GOTO(processTsOutPutOneGroup(sStreamReaderInfo, tsRsp, pResBlock, order));
44,840✔
2762
  } else {
2763
    STREAM_CHECK_RET_GOTO(processTsOutPutAllGroups(sStreamReaderInfo, tsRsp, pResBlock, order));
7,427✔
2764
  }                             
2765
end:
225,635✔
2766
  blockDataDestroy(pResBlock);
225,874✔
2767
  taosMemoryFreeClear(pList);
225,886✔
2768
  STREAM_PRINT_LOG_END_WITHID(code, lino);
225,886✔
2769
  return code;
225,886✔
2770
}
2771

2772
static int32_t processTsOnce(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
54,052✔
2773
                                  SStreamReaderTaskInner* pTaskInner, uint64_t gid) {
2774
  int32_t code = 0;
54,052✔
2775
  int32_t lino = 0;
54,052✔
2776
  STableKeyInfo* pList = NULL;
54,052✔
2777
  void* pTask = sStreamReaderInfo->pTask;
54,052✔
2778
  
2779
  SSDataBlock*  pResBlock = NULL;
54,052✔
2780

2781
  int32_t        pNum = 0;
54,052✔
2782
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, gid, &pList, &pNum));
54,052✔
2783
  STREAM_CHECK_CONDITION_GOTO(pNum == 0, TSDB_CODE_SUCCESS);
54,286✔
2784
  STREAM_CHECK_RET_GOTO(createDataBlockTsUid(&pResBlock, pNum));
41,038✔
2785

2786
  pTaskInner->options->suid = sStreamReaderInfo->suid;
40,804✔
2787
  STREAM_CHECK_RET_GOTO(getAllTs(pVnode, pResBlock, pTaskInner, pList, pNum));
41,038✔
2788
  STREAM_CHECK_CONDITION_GOTO(pResBlock->info.rows == 0, TDB_CODE_SUCCESS);
41,038✔
2789
  int32_t order = pTaskInner->options->order;
40,804✔
2790

2791
  STREAM_CHECK_RET_GOTO(processTsOutPutOneGroup(sStreamReaderInfo, tsRsp, pResBlock, order));
40,806✔
2792
end:
54,052✔
2793
  blockDataDestroy(pResBlock);
54,052✔
2794
  taosMemoryFreeClear(pList);
54,052✔
2795
  STREAM_PRINT_LOG_END_WITHID(code, lino);
54,052✔
2796
  return code;
54,052✔
2797
}
2798

2799
static int32_t processTs(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
469,811✔
2800
                                  SStreamReaderTaskInner* pTaskInner) {
2801
  if (sStreamReaderInfo->isVtableStream) {
469,811✔
2802
    return processTsVTable(pVnode, tsRsp, sStreamReaderInfo, pTaskInner);
243,686✔
2803
  }
2804

2805
  return processTsNonVTable(pVnode, tsRsp, sStreamReaderInfo, pTaskInner);
226,125✔
2806
}
2807

2808
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
133,213✔
2809
  int32_t code = 0;
133,213✔
2810
  int32_t lino = 0;
133,213✔
2811
  void*   buf = NULL;
133,213✔
2812
  size_t  size = 0;
133,213✔
2813
  void* pTask = sStreamReaderInfo->pTask;
133,213✔
2814

2815
  ST_TASK_DLOG("vgId:%d %s start, trigger hash size:%d, calc hash size:%d", TD_VID(pVnode), __func__,
133,213✔
2816
                tSimpleHashGetSize(req->setTableReq.uidInfoTrigger), tSimpleHashGetSize(req->setTableReq.uidInfoCalc));
2817

2818
  taosWLockLatch(&sStreamReaderInfo->lock);
133,213✔
2819
  TSWAP(sStreamReaderInfo->uidHashTrigger, req->setTableReq.uidInfoTrigger);
133,213✔
2820
  TSWAP(sStreamReaderInfo->uidHashCalc, req->setTableReq.uidInfoCalc);
133,213✔
2821
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashTrigger, TSDB_CODE_INVALID_PARA);
133,213✔
2822
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashCalc, TSDB_CODE_INVALID_PARA);
133,213✔
2823

2824
  STREAM_CHECK_RET_GOTO(initStreamTableListInfo(&sStreamReaderInfo->vSetTableList));
133,213✔
2825
  STREAM_CHECK_RET_GOTO(qBuildVTableList(sStreamReaderInfo));
133,213✔
2826
end:
133,213✔
2827
  taosWUnLockLatch(&sStreamReaderInfo->lock);
133,213✔
2828
  STREAM_PRINT_LOG_END_WITHID(code, lino);
133,213✔
2829
  SRpcMsg rsp = {
133,213✔
2830
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2831
  tmsgSendRsp(&rsp);
133,213✔
2832
  return code;
133,213✔
2833
}
2834

2835
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
284,887✔
2836
  int32_t                 code = 0;
284,887✔
2837
  int32_t                 lino = 0;
284,887✔
2838
  SStreamReaderTaskInner* pTaskInner = NULL;
284,887✔
2839
  SStreamTsResponse       tsRsp = {0};
285,104✔
2840
  void*                   buf = NULL;
285,104✔
2841
  size_t                  size = 0;
285,104✔
2842

2843
  void* pTask = sStreamReaderInfo->pTask;
285,104✔
2844

2845
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
285,104✔
2846

2847
  BUILD_OPTION(options, 0, sStreamReaderInfo->tableList.version, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, NULL, false, NULL);
285,104✔
2848
  STREAM_CHECK_RET_GOTO(createStreamTaskForTs(&options, &pTaskInner, &sStreamReaderInfo->storageApi));
285,104✔
2849

2850
  tsRsp.ver = sStreamReaderInfo->tableList.version + 1;
285,104✔
2851

2852
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &tsRsp, sStreamReaderInfo, pTaskInner));
285,104✔
2853
  
2854
end:
284,869✔
2855
  ST_TASK_DLOG("vgId:%d %s get result size:%"PRIzu", ver:%"PRId64, TD_VID(pVnode), __func__, taosArrayGetSize(tsRsp.tsInfo), tsRsp.ver);
285,104✔
2856
  code = buildTsRsp(&tsRsp, &buf, &size);
285,104✔
2857
  STREAM_PRINT_LOG_END_WITHID(code, lino);
284,618✔
2858
  SRpcMsg rsp = {
284,869✔
2859
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2860
  tmsgSendRsp(&rsp);
284,869✔
2861
  taosArrayDestroy(tsRsp.tsInfo);
285,104✔
2862
  taosMemoryFree(pTaskInner);
285,104✔
2863
  return code;
285,104✔
2864
}
2865

2866
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
238,759✔
2867
  int32_t                 code = 0;
238,759✔
2868
  int32_t                 lino = 0;
238,759✔
2869
  SStreamReaderTaskInner* pTaskInner = NULL;
238,759✔
2870
  SStreamTsResponse       tsRsp = {0};
238,759✔
2871
  void*                   buf = NULL;
238,759✔
2872
  size_t                  size = 0;
238,759✔
2873

2874
  void* pTask = sStreamReaderInfo->pTask;
238,993✔
2875
  ST_TASK_DLOG("vgId:%d %s start, startTime:%"PRId64" ver:%"PRId64" gid:%"PRId64, TD_VID(pVnode), __func__, req->firstTsReq.startTime, req->firstTsReq.ver, req->firstTsReq.gid);
238,993✔
2876
  int32_t        pNum = 0;
238,993✔
2877

2878
  tsRsp.ver = pVnode->state.applied;
238,993✔
2879

2880
  BUILD_OPTION(options, 0, req->firstTsReq.ver, TSDB_ORDER_ASC, req->firstTsReq.startTime, INT64_MAX, NULL, false, NULL);
238,993✔
2881
  STREAM_CHECK_RET_GOTO(createStreamTaskForTs(&options, &pTaskInner, &sStreamReaderInfo->storageApi));
238,759✔
2882

2883
  if (req->firstTsReq.gid != 0) {
238,759✔
2884
    STREAM_CHECK_RET_GOTO(processTsOnce(pVnode, &tsRsp, sStreamReaderInfo, pTaskInner, req->firstTsReq.gid));
54,052✔
2885
  } else {
2886
    STREAM_CHECK_RET_GOTO(processTs(pVnode, &tsRsp, sStreamReaderInfo, pTaskInner));
184,707✔
2887
  }
2888

2889
end:
238,759✔
2890
  ST_TASK_DLOG("vgId:%d %s get result size:%"PRIzu", ver:%"PRId64, TD_VID(pVnode), __func__, taosArrayGetSize(tsRsp.tsInfo), tsRsp.ver);
238,993✔
2891
  code = buildTsRsp(&tsRsp, &buf, &size);
238,993✔
2892
  STREAM_PRINT_LOG_END_WITHID(code, lino);
237,829✔
2893
  SRpcMsg rsp = {
237,829✔
2894
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2895
  tmsgSendRsp(&rsp);
237,595✔
2896
  taosArrayDestroy(tsRsp.tsInfo);
238,993✔
2897
  taosMemoryFree(pTaskInner);
238,993✔
2898
  return code;
238,993✔
2899
}
2900

2901
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
440,048✔
2902
  int32_t code = 0;
440,048✔
2903
  int32_t lino = 0;
440,048✔
2904
  void*   buf = NULL;
440,048✔
2905
  size_t  size = 0;
440,048✔
2906
  STableKeyInfo* pList = NULL;
440,048✔
2907

2908
  void* pTask = sStreamReaderInfo->pTask;
440,048✔
2909
  ST_TASK_DLOG("vgId:%d %s start, ver:%" PRId64 ",skey:%" PRId64 ",ekey:%" PRId64 ",gid:%" PRId64, TD_VID(pVnode),
440,048✔
2910
               __func__, req->tsdbMetaReq.ver, req->tsdbMetaReq.startTime, req->tsdbMetaReq.endTime,
2911
               req->tsdbMetaReq.gid);
2912

2913
  SStreamReaderTaskInner* pTaskInner = NULL;
440,048✔
2914
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
440,048✔
2915

2916
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
440,048✔
2917
    int32_t        pNum = 0;
440,048✔
2918
    STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbMetaReq.gid, &pList, &pNum));
440,048✔
2919
    BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbMetaReq.ver, req->tsdbMetaReq.order, req->tsdbMetaReq.startTime, req->tsdbMetaReq.endTime, 
440,048✔
2920
                          sStreamReaderInfo->tsSchemas, true, NULL);
2921
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, pList, pNum, &sStreamReaderInfo->storageApi));
440,048✔
2922
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
440,048✔
2923
    
2924
    STREAM_CHECK_RET_GOTO(createBlockForTsdbMeta(&pTaskInner->pResBlockDst, sStreamReaderInfo->isVtableStream));
440,048✔
2925
  } else {
2926
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2927
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2928
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2929
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2930
  }
2931

2932
  blockDataCleanup(pTaskInner->pResBlockDst);
440,048✔
2933
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
440,048✔
2934
  bool hasNext = true;
440,048✔
2935
  while (true) {
330,988✔
2936
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
771,036✔
2937
    if (!hasNext) {
771,036✔
2938
      break;
440,048✔
2939
    }
2940
    pTaskInner->storageApi->tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
330,988✔
2941
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
330,988✔
2942

2943
    int32_t index = 0;
330,988✔
2944
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
330,988✔
2945
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
330,988✔
2946
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
330,988✔
2947
    if (!sStreamReaderInfo->isVtableStream) {
330,988✔
2948
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
134,608✔
2949
    }
2950
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
330,988✔
2951

2952
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
330,988✔
2953
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2954
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2955
            pTaskInner->pResBlockDst->info.rows++;
330,988✔
2956
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
330,988✔
2957
      break;
×
2958
    }
2959
  }
2960

2961
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
440,048✔
2962
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
440,048✔
2963
  printDataBlock(pTaskInner->pResBlockDst, __func__, "meta", ((SStreamTask *)sStreamReaderInfo->pTask)->streamId);
440,048✔
2964
  if (!hasNext) {
440,048✔
2965
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
440,048✔
2966
  }
2967

2968
end:
440,048✔
2969
  STREAM_PRINT_LOG_END_WITHID(code, lino);
440,048✔
2970
  SRpcMsg rsp = {
440,048✔
2971
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2972
  tmsgSendRsp(&rsp);
440,048✔
2973
  taosMemoryFree(pList);
440,048✔
2974
  return code;
440,048✔
2975
}
2976

2977
static int32_t vnodeProcessStreamTsdbTsDataReqNonVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
128,344✔
2978
  int32_t                 code = 0;
128,344✔
2979
  int32_t                 lino = 0;
128,344✔
2980
  SStreamReaderTaskInner* pTaskInner = NULL;
128,344✔
2981
  void*                   buf = NULL;
128,344✔
2982
  size_t                  size = 0;
128,344✔
2983
  SSDataBlock*            pBlockRes = NULL;
128,344✔
2984

2985
  void* pTask = sStreamReaderInfo->pTask;
128,344✔
2986
  ST_TASK_DLOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
128,344✔
2987
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2988
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2989

2990
  int32_t        pNum = 1;
128,344✔
2991
  STableKeyInfo  pList = {.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, req->tsdbTsDataReq.uid), .uid = req->tsdbTsDataReq.uid};
128,344✔
2992
  STREAM_CHECK_CONDITION_GOTO(pList.groupId == -1, TSDB_CODE_INVALID_PARA);
128,344✔
2993
  BUILD_OPTION(options, getSuid(sStreamReaderInfo, &pList), req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
128,344✔
2994
               sStreamReaderInfo->triggerCols, false, NULL);
2995
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &pList, pNum, &sStreamReaderInfo->storageApi));
128,344✔
2996
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
128,344✔
2997
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
128,344✔
2998

2999
  while (1) {
128,344✔
3000
    bool hasNext = false;
256,688✔
3001
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
256,688✔
3002
    if (!hasNext) {
256,688✔
3003
      break;
128,344✔
3004
    }
3005
    // if (!sStreamReaderInfo->isVtableStream){
3006
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
128,344✔
3007
    // }
3008

3009
    SSDataBlock* pBlock = NULL;
128,344✔
3010
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
128,344✔
3011
    if (pBlock != NULL && pBlock->info.rows > 0) {
128,344✔
3012
      STREAM_CHECK_RET_GOTO(processTag(sStreamReaderInfo, false, pBlock->info.id.uid, pBlock,
128,344✔
3013
          0, pBlock->info.rows, 1));
3014
    }
3015
    
3016
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderInfo->pFilterInfo, NULL));
128,344✔
3017
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
128,344✔
3018
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
128,344✔
3019
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
3020
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
3021
  }
3022

3023
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
128,344✔
3024

3025
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
128,344✔
3026
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
128,344✔
3027

3028
end:
128,344✔
3029
  STREAM_PRINT_LOG_END_WITHID(code, lino);
128,344✔
3030
  SRpcMsg rsp = {
128,344✔
3031
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3032
  tmsgSendRsp(&rsp);
128,344✔
3033
  blockDataDestroy(pBlockRes);
128,344✔
3034

3035
  releaseStreamTask(&pTaskInner);
128,344✔
3036
  return code;
128,344✔
3037
}
3038

3039
static int32_t vnodeProcessStreamTsdbTsDataReqVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
×
3040
  int32_t                 code = 0;
×
3041
  int32_t                 lino = 0;
×
3042
  SStreamReaderTaskInner* pTaskInner = NULL;
×
3043
  void*                   buf = NULL;
×
3044
  size_t                  size = 0;
×
3045
  SSDataBlock*            pBlockRes = NULL;
×
3046

3047
  void* pTask = sStreamReaderInfo->pTask;
×
3048
  ST_TASK_DLOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
×
3049
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
3050
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
3051

3052
  int32_t        pNum = 1;
×
3053
  STableKeyInfo  pList = {.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, req->tsdbTsDataReq.uid), .uid = req->tsdbTsDataReq.uid};
×
3054
  STREAM_CHECK_CONDITION_GOTO(pList.groupId == -1, TSDB_CODE_INVALID_PARA);
×
3055
  BUILD_OPTION(options, getSuid(sStreamReaderInfo, &pList), req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
×
3056
               sStreamReaderInfo->tsSchemas, true, NULL);
3057
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->tsBlock, &pList, pNum, &sStreamReaderInfo->storageApi));
×
3058
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
×
3059

3060
  while (1) {
×
3061
    bool hasNext = false;
×
3062
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
×
3063
    if (!hasNext) {
×
3064
      break;
×
3065
    }
3066

3067
    SSDataBlock* pBlock = NULL;
×
3068
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
×
3069
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRes, pBlock));
×
3070
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
×
3071
            TD_VID(pVnode), __func__, pBlockRes->info.window.skey, pBlockRes->info.window.ekey,
3072
            pBlockRes->info.id.uid, pBlockRes->info.id.groupId, pBlockRes->info.rows);
3073
  }
3074

3075
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
×
3076
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
×
3077

3078
end:
×
3079
  STREAM_PRINT_LOG_END_WITHID(code, lino);
×
3080
  SRpcMsg rsp = {
×
3081
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3082
  tmsgSendRsp(&rsp);
×
3083
  blockDataDestroy(pBlockRes);
×
3084

3085
  releaseStreamTask(&pTaskInner);
×
3086
  return code;
×
3087
}
3088

3089
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
112,056✔
3090
  int32_t code = 0;
112,056✔
3091
  int32_t lino = 0;
112,056✔
3092
  void*   buf = NULL;
112,056✔
3093
  size_t  size = 0;
112,056✔
3094
  STableKeyInfo* pList = NULL;
112,056✔
3095
  SArray*        pResList = NULL;
112,056✔
3096
  SSDataBlock*   pBlockTmp = NULL;
112,056✔
3097

3098
  SStreamReaderTaskInner* pTaskInner = NULL;
112,056✔
3099
  void* pTask = sStreamReaderInfo->pTask;
112,056✔
3100
  ST_TASK_DLOG("vgId:%d %s start. ver:%"PRId64",order:%d,startTs:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, req->tsdbTriggerDataReq.gid);
112,056✔
3101
  
3102
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
112,056✔
3103

3104
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
112,290✔
3105
    int32_t        pNum = 0;
56,145✔
3106
    STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbTriggerDataReq.gid, &pList, &pNum));
56,145✔
3107
    BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
56,145✔
3108
                 sStreamReaderInfo->triggerCols, false, NULL);
3109
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, pList, pNum, &sStreamReaderInfo->storageApi));
56,145✔
3110
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
55,911✔
3111
  } else {
3112
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
56,145✔
3113
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
56,145✔
3114
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
3115
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
3116
  }
3117

3118
  blockDataCleanup(pTaskInner->pResBlockDst);
56,145✔
3119
  bool hasNext = true;
56,145✔
3120
  int32_t totalRows = 0;
56,145✔
3121
    
3122
  pResList = taosArrayInit(4, POINTER_BYTES);
56,145✔
3123
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
56,145✔
3124
  while (1) {
61,398✔
3125
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
117,543✔
3126
    if (!hasNext) {
117,543✔
3127
      break;
56,145✔
3128
    }
3129
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
61,398✔
3130
    // pTaskInner->pResBlockDst->info.id.groupId = pTaskInner->pResBlock->info.id.groupId;
3131

3132
    SSDataBlock* pBlock = NULL;
61,398✔
3133
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
61,398✔
3134
    if (pBlock != NULL && pBlock->info.rows > 0) {
61,398✔
3135
      STREAM_CHECK_RET_GOTO(
61,398✔
3136
        processTag(sStreamReaderInfo, false, pBlock->info.id.uid, pBlock, 0, pBlock->info.rows, 1));
3137
    }
3138
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderInfo->pFilterInfo, NULL));
61,398✔
3139
    // STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
3140
    ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
61,398✔
3141
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pBlock, true, &pBlockTmp));
61,398✔
3142
    STREAM_CHECK_NULL_GOTO(taosArrayPush(pResList, &pBlockTmp), terrno);
61,398✔
3143
    totalRows += blockDataGetNumOfRows(pBlockTmp);
61,398✔
3144
    pBlockTmp = NULL;
61,398✔
3145

3146
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
61,398✔
3147
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
3148
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
3149
    if (totalRows >= STREAM_RETURN_ROWS_NUM) {  //todo optimize send multi blocks in one group
61,398✔
3150
      break;
×
3151
    }
3152
  }
3153

3154
  STREAM_CHECK_RET_GOTO(buildArrayRsp(pResList, &buf, &size));
56,145✔
3155
  if (!hasNext) {
56,145✔
3156
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
56,145✔
3157
  }
3158

3159
end:
112,290✔
3160
  STREAM_PRINT_LOG_END_WITHID(code, lino);
111,826✔
3161
  SRpcMsg rsp = {
111,826✔
3162
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3163
  tmsgSendRsp(&rsp);
112,058✔
3164
  taosMemoryFree(pList);
112,290✔
3165
  blockDataDestroy(pBlockTmp);
112,290✔
3166
  taosArrayDestroyP(pResList, (FDelete)blockDataDestroy);
112,290✔
3167
  return code;
112,290✔
3168
}
3169

3170
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
6,204,040✔
3171
  int32_t code = 0;
6,204,040✔
3172
  int32_t lino = 0;
6,204,040✔
3173
  void*   buf = NULL;
6,204,040✔
3174
  size_t  size = 0;
6,204,452✔
3175
  SSDataBlock*   pBlockRes = NULL;
6,204,452✔
3176
  STableKeyInfo* pList = NULL;
6,204,452✔
3177

3178

3179
  void* pTask = sStreamReaderInfo->pTask;
6,204,452✔
3180
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64",ver:%"PRId64, TD_VID(pVnode), __func__, 
6,204,452✔
3181
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid, req->tsdbCalcDataReq.ver);
3182

3183
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
6,204,452✔
3184

3185
  SStreamReaderTaskInner* pTaskInner = NULL;
6,204,452✔
3186
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
6,204,452✔
3187

3188
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
6,204,452✔
3189
    int32_t        pNum = 0;
6,204,452✔
3190
    STREAM_CHECK_RET_GOTO(qStreamGetTableList(sStreamReaderInfo, req->tsdbCalcDataReq.gid, &pList, &pNum));
6,204,452✔
3191
    BUILD_OPTION(options, getSuid(sStreamReaderInfo, pList), req->tsdbCalcDataReq.ver, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
6,204,452✔
3192
                 sStreamReaderInfo->triggerCols, false, NULL);
3193
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, pList, pNum, &sStreamReaderInfo->storageApi));
6,204,452✔
3194

3195
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
6,198,272✔
3196
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
6,203,216✔
3197
  } else {
3198
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
3199
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
3200
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
3201
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
3202
  }
3203

3204
  blockDataCleanup(pTaskInner->pResBlockDst);
6,199,285✔
3205
  bool hasNext = true;
6,194,169✔
3206
  while (1) {
738,477✔
3207
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
6,935,942✔
3208
    if (!hasNext) {
6,933,236✔
3209
      break;
6,197,448✔
3210
    }
3211
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupIdFromSet(sStreamReaderInfo, pTaskInner->pResBlock->info.id.uid);
735,788✔
3212

3213
    SSDataBlock* pBlock = NULL;
738,666✔
3214
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
738,666✔
3215
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderInfo->pFilterInfo, NULL));
737,848✔
3216
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
738,237✔
3217
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
738,477✔
3218
      break;
×
3219
    }
3220
  }
3221

3222
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
6,194,770✔
3223
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
6,202,553✔
3224
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
6,199,920✔
3225
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
6,200,916✔
3226
  printDataBlock(pBlockRes, __func__, "tsdb_calc_data", ((SStreamTask*)pTask)->streamId);
6,197,620✔
3227
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
6,195,309✔
3228
  printDataBlock(pBlockRes, __func__, "tsdb_data", ((SStreamTask*)pTask)->streamId);
6,201,946✔
3229

3230
  if (!hasNext) {
6,204,452✔
3231
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
6,204,452✔
3232
  }
3233

3234
end:
6,207,308✔
3235
  STREAM_PRINT_LOG_END_WITHID(code, lino);
6,202,347✔
3236
  SRpcMsg rsp = {
6,204,658✔
3237
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3238
  tmsgSendRsp(&rsp);
6,204,452✔
3239
  blockDataDestroy(pBlockRes);
6,203,010✔
3240
  taosMemoryFree(pList);
6,203,216✔
3241
  return code;
6,203,216✔
3242
}
3243

3244
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
491,320✔
3245
  int32_t code = 0;
491,320✔
3246
  int32_t lino = 0;
491,320✔
3247
  void*   buf = NULL;
491,320✔
3248
  size_t  size = 0;
491,320✔
3249
  int32_t* slotIdList = NULL;
491,320✔
3250
  SArray* sortedCid = NULL;
491,320✔
3251
  SArray* schemas = NULL;
491,320✔
3252
  SSDataBlock*   pBlockRes = NULL;
491,320✔
3253
  
3254
  void* pTask = sStreamReaderInfo->pTask;
491,320✔
3255
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",ver:%"PRId64, TD_VID(pVnode), __func__, 
491,320✔
3256
    req->tsdbDataReq.skey, req->tsdbDataReq.ekey, req->tsdbDataReq.uid, req->tsdbDataReq.ver);
3257
    
3258
  SStreamReaderTaskInner* pTaskInner = NULL;
491,320✔
3259
  int64_t key = req->tsdbDataReq.uid;
491,320✔
3260

3261
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
491,320✔
3262
    // sort cid and build slotIdList
3263
    slotIdList = taosMemoryMalloc(taosArrayGetSize(req->tsdbDataReq.cids) * sizeof(int32_t));
491,320✔
3264
    STREAM_CHECK_NULL_GOTO(slotIdList, terrno);
491,320✔
3265
    sortedCid = taosArrayDup(req->tsdbDataReq.cids, NULL);
491,320✔
3266
    STREAM_CHECK_NULL_GOTO(sortedCid, terrno);
491,320✔
3267
    taosArraySort(sortedCid, sortCid);
491,320✔
3268
    for (int32_t i = 0; i < taosArrayGetSize(req->tsdbDataReq.cids); i++) {
1,925,616✔
3269
      int16_t* cid = taosArrayGet(req->tsdbDataReq.cids, i);
1,434,296✔
3270
      STREAM_CHECK_NULL_GOTO(cid, terrno);
1,434,296✔
3271
      for (int32_t j = 0; j < taosArrayGetSize(sortedCid); j++) {
2,835,908✔
3272
        int16_t* cidSorted = taosArrayGet(sortedCid, j);
2,835,908✔
3273
        STREAM_CHECK_NULL_GOTO(cidSorted, terrno);
2,835,908✔
3274
        if (*cid == *cidSorted) {
2,835,908✔
3275
          slotIdList[j] = i;
1,434,296✔
3276
          break;
1,434,296✔
3277
        }
3278
      }
3279
    }
3280

3281
    STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, req->tsdbDataReq.uid, &schemas, &sStreamReaderInfo->storageApi));
491,320✔
3282
    STREAM_CHECK_RET_GOTO(shrinkScheams(req->tsdbDataReq.cids, schemas));
489,028✔
3283
    STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlockRes));
489,028✔
3284

3285
    taosArraySort(schemas, sortSSchema);
489,028✔
3286
    BUILD_OPTION(options, req->tsdbDataReq.suid, req->tsdbDataReq.ver, req->tsdbDataReq.order, req->tsdbDataReq.skey,
489,028✔
3287
                    req->tsdbDataReq.ekey, schemas, true, &slotIdList);
3288
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid, .groupId = 0};
489,028✔
3289
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, pBlockRes, &keyInfo, 1, &sStreamReaderInfo->storageApi));
489,028✔
3290
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
489,028✔
3291
    pTaskInner->pResBlockDst = pBlockRes;
489,028✔
3292
    pBlockRes = NULL;
489,028✔
3293
  } else {
3294
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
3295
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
3296
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
3297
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
3298
  }
3299

3300
  blockDataCleanup(pTaskInner->pResBlockDst);
489,028✔
3301
  bool hasNext = true;
489,028✔
3302
  while (1) {
489,028✔
3303
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
978,056✔
3304
    if (!hasNext) {
978,056✔
3305
      break;
489,028✔
3306
    }
3307

3308
    SSDataBlock* pBlock = NULL;
489,028✔
3309
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
489,028✔
3310
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
489,028✔
3311
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
489,028✔
3312
      break;
×
3313
    }
3314
  }
3315
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
489,028✔
3316
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
489,028✔
3317
  printDataBlock(pTaskInner->pResBlockDst, __func__, "tsdb_data", ((SStreamTask*)pTask)->streamId);
489,028✔
3318
  if (!hasNext) {
489,028✔
3319
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
489,028✔
3320
  }
3321

3322
end:
491,320✔
3323
  STREAM_PRINT_LOG_END_WITHID(code, lino);
491,320✔
3324
  SRpcMsg rsp = {
491,320✔
3325
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3326
  tmsgSendRsp(&rsp);
491,320✔
3327
  taosMemFree(slotIdList);
491,320✔
3328
  taosArrayDestroy(sortedCid);
491,320✔
3329
  taosArrayDestroy(schemas);
491,320✔
3330
  blockDataDestroy(pBlockRes);
491,320✔
3331
  return code;
491,320✔
3332
}
3333

3334
static int32_t vnodeProcessStreamWalMetaNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
11,991,102✔
3335
  int32_t      code = 0;
11,991,102✔
3336
  int32_t      lino = 0;
11,991,102✔
3337
  void*        buf = NULL;
11,991,102✔
3338
  size_t       size = 0;
11,991,102✔
3339
  int64_t      lastVer = 0;
11,991,102✔
3340
  SSTriggerWalNewRsp resultRsp = {0};
11,991,102✔
3341

3342
  void* pTask = sStreamReaderInfo->pTask;
11,991,368✔
3343
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaNewReq.lastVer);
11,989,506✔
3344

3345
  if (sStreamReaderInfo->metaBlock == NULL) {
11,990,270✔
3346
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
195,019✔
3347
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
195,019✔
3348
  }
3349
  blockDataEmpty(sStreamReaderInfo->metaBlock);
11,992,769✔
3350
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
11,990,036✔
3351
  resultRsp.ver = req->walMetaNewReq.lastVer;
11,990,268✔
3352
  STREAM_CHECK_RET_GOTO(processWalVerMetaNew(pVnode, &resultRsp, sStreamReaderInfo, req->walMetaNewReq.ctime));
11,990,967✔
3353

3354
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
11,990,242✔
3355
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
11,990,690✔
3356
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
415,538✔
3357
  buf = rpcMallocCont(size);
415,363✔
3358
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
415,538✔
3359
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
415,538✔
3360
  printDataBlock(resultRsp.deleteBlock, __func__, "delete", ((SStreamTask*)pTask)->streamId);
415,324✔
3361
  printDataBlock(resultRsp.tableBlock, __func__, "table", ((SStreamTask*)pTask)->streamId);
415,321✔
3362

3363
end:
11,990,473✔
3364
  if (code == 0 && resultRsp.totalRows == 0) {
11,990,677✔
3365
    code = TSDB_CODE_STREAM_NO_DATA;
11,575,124✔
3366
    size = sizeof(int64_t) * 2;
11,575,124✔
3367
    buf = rpcMallocCont(size);
11,575,124✔
3368
    *(int64_t*)buf = resultRsp.ver;
11,573,278✔
3369
    *(((int64_t*)buf) + 1) = resultRsp.verTime;
11,573,278✔
3370
  }
3371
  SRpcMsg rsp = {
11,990,690✔
3372
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3373
  tmsgSendRsp(&rsp);
11,990,085✔
3374
  if (code == TSDB_CODE_STREAM_NO_DATA){
11,991,965✔
3375
    code = 0;
11,576,370✔
3376
  }
3377
  STREAM_PRINT_LOG_END_WITHID(code, lino);
11,991,965✔
3378
  blockDataDestroy(resultRsp.deleteBlock);
11,992,576✔
3379
  blockDataDestroy(resultRsp.tableBlock);
11,991,309✔
3380

3381
  return code;
11,992,308✔
3382
}
3383
static int32_t vnodeProcessStreamWalMetaDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
3,367,275✔
3384
  int32_t      code = 0;
3,367,275✔
3385
  int32_t      lino = 0;
3,367,275✔
3386
  void*        buf = NULL;
3,367,275✔
3387
  size_t       size = 0;
3,367,275✔
3388
  SSTriggerWalNewRsp resultRsp = {0};
3,367,275✔
3389
  
3390
  void* pTask = sStreamReaderInfo->pTask;
3,368,003✔
3391
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaDataNewReq.lastVer);
3,367,539✔
3392

3393
  if (sStreamReaderInfo->metaBlock == NULL) {
3,367,539✔
3394
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
98,501✔
3395
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
98,501✔
3396
  }
3397

3398
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
3,367,307✔
3399
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerBlock, false, (SSDataBlock**)&resultRsp.dataBlock));
3,367,533✔
3400
  resultRsp.ver = req->walMetaDataNewReq.lastVer;
3,365,404✔
3401
  resultRsp.checkAlter = true;
3,367,078✔
3402
  resultRsp.indexHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
3,367,078✔
3403
  STREAM_CHECK_NULL_GOTO(resultRsp.indexHash, terrno);
3,365,873✔
3404

3405
  STREAM_CHECK_RET_GOTO(processWalVerMetaDataNew(pVnode, sStreamReaderInfo, &resultRsp));
3,365,873✔
3406

3407
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
3,363,819✔
3408
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
126,288✔
3409
  buf = rpcMallocCont(size);
126,982✔
3410
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
126,746✔
3411
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
126,982✔
3412
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
126,982✔
3413
  printDataBlock(resultRsp.deleteBlock, __func__, "delete", ((SStreamTask*)pTask)->streamId);
126,982✔
3414
  printDataBlock(resultRsp.tableBlock, __func__, "table", ((SStreamTask*)pTask)->streamId);
126,748✔
3415
  printIndexHash(resultRsp.indexHash, pTask);
126,748✔
3416

3417
end:
3,364,279✔
3418
  if (resultRsp.totalRows == 0) {
3,363,925✔
3419
    code = TSDB_CODE_STREAM_NO_DATA;
3,237,999✔
3420
    size = sizeof(int64_t) * 2;
3,237,999✔
3421
    buf = rpcMallocCont(size);
3,237,999✔
3422
    *(int64_t*)buf = resultRsp.ver;
3,238,109✔
3423
    *(((int64_t*)buf) + 1) = resultRsp.verTime;
3,238,109✔
3424
  }
3425
  SRpcMsg rsp = {
3,364,035✔
3426
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3427
  tmsgSendRsp(&rsp);
3,363,939✔
3428
  if (code == TSDB_CODE_STREAM_NO_DATA){
3,367,606✔
3429
    code = 0;
3,240,746✔
3430
  }
3431
  blockDataDestroy(resultRsp.dataBlock);
3,367,606✔
3432
  blockDataDestroy(resultRsp.deleteBlock);
3,366,288✔
3433
  blockDataDestroy(resultRsp.tableBlock);
3,366,288✔
3434
  tSimpleHashCleanup(resultRsp.indexHash);
3,366,835✔
3435

3436
  STREAM_PRINT_LOG_END_WITHID(code, lino);
3,364,179✔
3437

3438
  return code;
3,366,066✔
3439
}
3440

3441
static int32_t vnodeProcessStreamWalDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
7,935,176✔
3442
  int32_t      code = 0;
7,935,176✔
3443
  int32_t      lino = 0;
7,935,176✔
3444
  void*        buf = NULL;
7,935,176✔
3445
  size_t       size = 0;
7,935,176✔
3446
  SSTriggerWalNewRsp resultRsp = {0};
7,935,176✔
3447

3448
  void* pTask = sStreamReaderInfo->pTask;
7,935,411✔
3449
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
7,935,411✔
3450

3451
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerBlock, false, (SSDataBlock**)&resultRsp.dataBlock));
7,935,411✔
3452
  resultRsp.indexHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
7,935,179✔
3453
  STREAM_CHECK_NULL_GOTO(resultRsp.indexHash, terrno);
7,933,710✔
3454
  resultRsp.uidHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
7,933,710✔
3455
  STREAM_CHECK_NULL_GOTO(resultRsp.uidHash, terrno);
7,934,180✔
3456

3457
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
7,934,180✔
3458
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
7,934,712✔
3459

3460
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
7,934,477✔
3461

3462
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
237,559✔
3463
  buf = rpcMallocCont(size);
237,559✔
3464
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
237,559✔
3465
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
237,559✔
3466
  printIndexHash(resultRsp.indexHash, pTask);
237,559✔
3467

3468
end:
7,934,477✔
3469
  if (resultRsp.totalRows == 0) {
7,934,477✔
3470
    buf = rpcMallocCont(sizeof(int64_t));
7,697,617✔
3471
    *(int64_t *)buf = resultRsp.ver;
7,695,524✔
3472
    size = sizeof(int64_t);
7,695,524✔
3473
    code = TSDB_CODE_STREAM_NO_DATA;
7,695,524✔
3474
  }
3475
  SRpcMsg rsp = {
7,932,384✔
3476
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3477
  tmsgSendRsp(&rsp);
7,934,014✔
3478
  if (code == TSDB_CODE_STREAM_NO_DATA){
7,935,411✔
3479
    code = 0;
7,697,852✔
3480
  }
3481

3482
  blockDataDestroy(resultRsp.dataBlock);
7,935,411✔
3483
  blockDataDestroy(resultRsp.deleteBlock);
7,934,415✔
3484
  blockDataDestroy(resultRsp.tableBlock);
7,934,947✔
3485
  tSimpleHashCleanup(resultRsp.indexHash);
7,934,947✔
3486
  tSimpleHashCleanup(resultRsp.uidHash);
7,934,539✔
3487
  STREAM_PRINT_LOG_END_WITHID(code, lino);
7,934,415✔
3488

3489
  return code;
7,935,383✔
3490
}
3491

3492
static int32_t vnodeProcessStreamWalCalcDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
986,961✔
3493
  int32_t      code = 0;
986,961✔
3494
  int32_t      lino = 0;
986,961✔
3495
  void*        buf = NULL;
986,961✔
3496
  size_t       size = 0;
986,961✔
3497
  SSTriggerWalNewRsp resultRsp = {0};
986,961✔
3498
  SSDataBlock* pBlock1 = NULL;
986,961✔
3499
  SSDataBlock* pBlock2 = NULL;
986,961✔
3500
  
3501
  void* pTask = sStreamReaderInfo->pTask;
986,961✔
3502
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
986,961✔
3503

3504
  SSDataBlock* dataBlock = sStreamReaderInfo->isVtableStream ? sStreamReaderInfo->calcBlock : sStreamReaderInfo->triggerBlock;
986,961✔
3505
  STREAM_CHECK_RET_GOTO(createOneDataBlock(dataBlock, false, (SSDataBlock**)&resultRsp.dataBlock));
986,961✔
3506
  resultRsp.isCalc = sStreamReaderInfo->isVtableStream ? true : false;
985,433✔
3507
  resultRsp.indexHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
985,433✔
3508
  STREAM_CHECK_NULL_GOTO(resultRsp.indexHash, terrno);
986,197✔
3509
  resultRsp.uidHash = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
986,197✔
3510
  STREAM_CHECK_NULL_GOTO(resultRsp.uidHash, terrno);
985,433✔
3511

3512
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
985,433✔
3513
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
986,961✔
3514

3515
  if (!sStreamReaderInfo->isVtableStream){
246,883✔
3516
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcBlock, false, &pBlock2));
146,928✔
3517
  
3518
    blockDataTransform(pBlock2, resultRsp.dataBlock);
146,928✔
3519
    blockDataDestroy(resultRsp.dataBlock);
146,928✔
3520
    resultRsp.dataBlock = pBlock2;
146,928✔
3521
    pBlock2 = NULL;
146,928✔
3522
  }
3523

3524
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp);
246,883✔
3525
  buf = rpcMallocCont(size);
246,883✔
3526
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp);
246,883✔
3527
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
246,883✔
3528
  printIndexHash(resultRsp.indexHash, pTask);
246,439✔
3529

3530
end:
986,961✔
3531
  if (resultRsp.totalRows == 0) {
986,961✔
3532
    buf = rpcMallocCont(sizeof(int64_t));
740,078✔
3533
    *(int64_t *)buf = resultRsp.ver;
740,078✔
3534
    size = sizeof(int64_t);
740,078✔
3535
    code = TSDB_CODE_STREAM_NO_DATA;
740,078✔
3536
  }
3537
  SRpcMsg rsp = {
986,961✔
3538
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3539
  tmsgSendRsp(&rsp);
986,726✔
3540
  if (code == TSDB_CODE_STREAM_NO_DATA){
986,961✔
3541
    code = 0;
740,078✔
3542
  }
3543

3544
  blockDataDestroy(pBlock1);
986,961✔
3545
  blockDataDestroy(pBlock2);
986,197✔
3546
  blockDataDestroy(resultRsp.dataBlock);
986,961✔
3547
  blockDataDestroy(resultRsp.deleteBlock);
986,961✔
3548
  blockDataDestroy(resultRsp.tableBlock);
986,961✔
3549
  tSimpleHashCleanup(resultRsp.indexHash);
986,961✔
3550
  tSimpleHashCleanup(resultRsp.uidHash);
986,197✔
3551
  STREAM_PRINT_LOG_END_WITHID(code, lino);
986,197✔
3552

3553
  return code;
986,961✔
3554
}
3555

3556
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
282,087✔
3557
  int32_t code = 0;
282,087✔
3558
  int32_t lino = 0;
282,087✔
3559
  void*   buf = NULL;
282,087✔
3560
  size_t  size = 0;
282,087✔
3561
  SArray** gInfo = NULL;
282,087✔
3562
  
3563
  void* pTask = sStreamReaderInfo->pTask;
282,087✔
3564
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
282,087✔
3565

3566
  gInfo = taosHashAcquire(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
282,087✔
3567
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
282,087✔
3568
  SStreamGroupInfo pGroupInfo = {0};
282,087✔
3569
  pGroupInfo.gInfo = *gInfo;
282,087✔
3570

3571
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
282,087✔
3572
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
3573
  buf = rpcMallocCont(size);
282,087✔
3574
  STREAM_CHECK_NULL_GOTO(buf, terrno);
282,087✔
3575
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
282,087✔
3576
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
3577
end:
282,087✔
3578
  taosHashRelease(sStreamReaderInfo->groupIdMap, gInfo);
282,087✔
3579
  if (code != 0) {
281,869✔
3580
    rpcFreeCont(buf);
×
3581
    buf = NULL;
×
3582
    size = 0;
×
3583
  }
3584
  STREAM_PRINT_LOG_END_WITHID(code, lino);
281,869✔
3585
  SRpcMsg rsp = {
282,087✔
3586
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3587
  tmsgSendRsp(&rsp);
282,087✔
3588

3589
  return code;
282,087✔
3590
}
3591

3592
static int32_t setVtableInfo(SVnode* pVnode, SArray* infos, SArray* cids, int64_t uid, uint64_t gid, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo) {
185,729✔
3593
  int32_t              code = 0;
185,729✔
3594
  int32_t              lino = 0;
185,729✔
3595
  void* pTask = sStreamReaderInfo->pTask;
185,729✔
3596

3597
  VTableInfo* vTable = taosArrayReserve(infos, 1);
185,729✔
3598
  STREAM_CHECK_NULL_GOTO(vTable, terrno);
185,729✔
3599
  vTable->uid = uid;
185,729✔
3600
  vTable->gId = gid;
185,729✔
3601

3602
  ST_TASK_DLOG("vgId:%d %s put vtable uid:%"PRId64, TD_VID(pVnode), __func__, uid);
185,729✔
3603

3604
  code = sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByVersionUid(metaReader, sStreamReaderInfo->tableList.version, uid);
185,729✔
3605
  if (code != 0) {
185,729✔
3606
    ST_TASK_ELOG("vgId:%d %s get table entry by uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, uid, tstrerror(code));
×
3607
    goto end;
×
3608
  }
3609
  if (atomic_load_8(&sStreamReaderInfo->isVtableOnlyTs) == 1) {
185,729✔
3610
    vTable->cols.nCols = metaReader->me.colRef.nCols;
10,748✔
3611
    vTable->cols.version = metaReader->me.colRef.version;
10,748✔
3612
    vTable->cols.pColRef = taosMemoryCalloc(metaReader->me.colRef.nCols, sizeof(SColRef));
10,748✔
3613
    STREAM_CHECK_NULL_GOTO(vTable->cols.pColRef, terrno);
10,748✔
3614
    for (size_t j = 0; j < metaReader->me.colRef.nCols; j++) {
64,488✔
3615
      memcpy(vTable->cols.pColRef + j, &metaReader->me.colRef.pColRef[j], sizeof(SColRef));
53,740✔
3616
    }
3617
  } else {
3618
    vTable->cols.nCols = taosArrayGetSize(cids);
174,981✔
3619
    vTable->cols.version = metaReader->me.colRef.version;
174,981✔
3620
    vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
174,981✔
3621
    STREAM_CHECK_NULL_GOTO(vTable->cols.pColRef, terrno);
174,981✔
3622
    for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
665,972✔
3623
      for (size_t j = 0; j < metaReader->me.colRef.nCols; j++) {
2,067,641✔
3624
        if (metaReader->me.colRef.pColRef[j].hasRef &&
1,891,970✔
3625
            metaReader->me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
1,396,641✔
3626
          memcpy(vTable->cols.pColRef + i, &metaReader->me.colRef.pColRef[j], sizeof(SColRef));
315,320✔
3627
          break;
315,320✔
3628
        }
3629
      }
3630
    }
3631
  }
3632
  tDecoderClear(&metaReader->coder);
185,729✔
3633

3634
end:
185,729✔
3635
  return code;
185,729✔
3636
}
3637

3638
static int32_t getAllVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SArray* cids, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo){
75,475✔
3639
  int32_t              code = 0;
75,475✔
3640
  int32_t              lino = 0;
75,475✔
3641
  void* pTask = sStreamReaderInfo->pTask;
75,475✔
3642
  SArray*              pTableListArray = NULL;
75,475✔
3643

3644

3645
  pTableListArray = qStreamGetTableArrayList(sStreamReaderInfo);
75,475✔
3646
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
75,475✔
3647

3648
  vTableInfo->infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
75,475✔
3649
  STREAM_CHECK_NULL_GOTO(vTableInfo->infos, terrno);
75,475✔
3650

3651
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
261,204✔
3652
    SStreamTableKeyInfo* pKeyInfo = taosArrayGetP(pTableListArray, i);
185,729✔
3653
    if (pKeyInfo == NULL || pKeyInfo->markedDeleted) {
185,729✔
3654
      continue;
×
3655
    }
3656
    code = setVtableInfo(pVnode, vTableInfo->infos, cids, pKeyInfo->uid, pKeyInfo->groupId, metaReader, sStreamReaderInfo);
185,729✔
3657
    if (code != 0) {
185,729✔
3658
      ST_TASK_WLOG("vgId:%d %s set vtable info uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, pKeyInfo->uid, tstrerror(code));
×
3659
      code = 0;
×
3660
      continue;
×
3661
    }
3662
  }
3663

3664
end:
75,475✔
3665
  taosArrayDestroyP(pTableListArray, taosMemFree);
75,475✔
3666
  return code;
75,475✔
3667
}
3668

3669
static int32_t getSpicificVinfo(SVnode* pVnode, SStreamMsgVTableInfo* vTableInfo, SArray* uids, SArray* cids, SMetaReader* metaReader, SStreamTriggerReaderInfo* sStreamReaderInfo){
×
3670
  int32_t              code = 0;
×
3671
  int32_t              lino = 0;
×
3672
  void* pTask = sStreamReaderInfo->pTask;
×
3673

3674
  vTableInfo->infos = taosArrayInit(taosArrayGetSize(uids), sizeof(VTableInfo));
×
3675
  STREAM_CHECK_NULL_GOTO(vTableInfo->infos, terrno);
×
3676

3677
  for (size_t i = 0; i < taosArrayGetSize(uids); i++) {
×
3678
    int64_t* uid = taosArrayGet(uids, i);
×
3679
    STREAM_CHECK_NULL_GOTO(uid, terrno);
×
3680

3681
    taosRLockLatch(&sStreamReaderInfo->lock);
×
3682
    uint64_t groupId = qStreamGetGroupIdFromOrigin(sStreamReaderInfo, *uid);
×
3683
    taosRUnLockLatch(&sStreamReaderInfo->lock);
×
3684
    if (groupId == -1) {
×
3685
      ST_TASK_WLOG("vgId:%d %s uid:%"PRId64" not found in stream group", TD_VID(pVnode), __func__, *uid);
×
3686
      continue;
×
3687
    }
3688
    code = setVtableInfo(pVnode, vTableInfo->infos, cids, *uid, groupId, metaReader, sStreamReaderInfo);
×
3689
    if (code != 0) {
×
3690
      ST_TASK_WLOG("vgId:%d %s set vtable info uid:%"PRId64" failed, msg:%s", TD_VID(pVnode), __func__, *uid, tstrerror(code));
×
3691
      code = 0;
×
3692
      continue;
×
3693
    }
3694
  }
3695
  
3696
end:
×
3697
  return code;
×
3698
}
3699

3700
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
75,475✔
3701
  int32_t              code = 0;
75,475✔
3702
  int32_t              lino = 0;
75,475✔
3703
  void*                buf = NULL;
75,475✔
3704
  size_t               size = 0;
75,475✔
3705
  SStreamMsgVTableInfo vTableInfo = {0};
75,475✔
3706
  SMetaReader          metaReader = {0};
75,475✔
3707

3708
  void* pTask = sStreamReaderInfo->pTask;
75,475✔
3709
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
75,475✔
3710

3711
  SArray* cids = req->virTableInfoReq.cids;
75,475✔
3712
  STREAM_CHECK_NULL_GOTO(cids, terrno);
75,475✔
3713

3714
  if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
75,475✔
3715
    (void)atomic_val_compare_exchange_8(&sStreamReaderInfo->isVtableOnlyTs, 0, 1);
10,748✔
3716
  }
3717
  sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
75,475✔
3718

3719
  if (req->virTableInfoReq.fetchAllTable || req->virTableInfoReq.uids == NULL || taosArrayGetSize(req->virTableInfoReq.uids) == 0) {
75,475✔
3720
    STREAM_CHECK_RET_GOTO(getAllVinfo(pVnode, &vTableInfo, cids, &metaReader, sStreamReaderInfo));
75,475✔
3721
  } else {
3722
    STREAM_CHECK_RET_GOTO(getSpicificVinfo(pVnode, &vTableInfo, req->virTableInfoReq.uids, cids, &metaReader, sStreamReaderInfo));
×
3723
  }
3724
  ST_TASK_DLOG("vgId:%d %s end, size:%"PRIzu, TD_VID(pVnode), __func__, taosArrayGetSize(vTableInfo.infos));
75,475✔
3725
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
75,475✔
3726

3727
end:
75,475✔
3728
  tDestroySStreamMsgVTableInfo(&vTableInfo);
75,475✔
3729
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReader);
75,475✔
3730
  STREAM_PRINT_LOG_END_WITHID(code, lino);
75,475✔
3731
  SRpcMsg rsp = {
75,475✔
3732
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3733
  tmsgSendRsp(&rsp);
75,475✔
3734
  return code;
75,475✔
3735
}
3736

3737
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
133,213✔
3738
  int32_t                   code = 0;
133,213✔
3739
  int32_t                   lino = 0;
133,213✔
3740
  void*                     buf = NULL;
133,213✔
3741
  size_t                    size = 0;
133,213✔
3742
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
133,213✔
3743
  SMetaReader               metaReader = {0};
133,213✔
3744
  void*                     pTask = sStreamReaderInfo->pTask;
133,213✔
3745

3746
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
133,213✔
3747

3748
  SArray* cols = req->origTableInfoReq.cols;
133,213✔
3749
  STREAM_CHECK_NULL_GOTO(cols, terrno);
133,213✔
3750

3751
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
133,213✔
3752

3753
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
133,213✔
3754

3755
  sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
133,213✔
3756
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
491,525✔
3757
    OTableInfo*    oInfo = taosArrayGet(cols, i);
358,312✔
3758
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
358,312✔
3759
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
358,312✔
3760
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
358,312✔
3761
    code = sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName);
358,312✔
3762
    if (code != 0) {
358,312✔
3763
      code = 0;
204✔
3764
      ST_TASK_ELOG("vgId:%d %s get table entry by name:%s failed, msg:%s", TD_VID(pVnode), __func__, oInfo->refTableName, tstrerror(code));
204✔
3765
      continue;
204✔
3766
    }
3767
    vTableInfo->uid = metaReader.me.uid;
358,108✔
3768
    ST_TASK_DLOG("vgId:%d %s get original uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
358,108✔
3769

3770
    SSchemaWrapper* sSchemaWrapper = NULL;
358,108✔
3771
    if (metaReader.me.type == TD_CHILD_TABLE) {
358,108✔
3772
      int64_t suid = metaReader.me.ctbEntry.suid;
356,266✔
3773
      vTableInfo->suid = suid;
356,266✔
3774
      tDecoderClear(&metaReader.coder);
356,266✔
3775
      STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReader, suid));
356,266✔
3776
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
356,266✔
3777
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
1,842✔
3778
      vTableInfo->suid = 0;
1,842✔
3779
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
1,842✔
3780
    } else {
3781
      ST_TASK_ELOG("invalid table type:%d", metaReader.me.type);
×
3782
    }
3783

3784
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
1,225,926✔
3785
      SSchema* s = sSchemaWrapper->pSchema + j;
1,226,161✔
3786
      if (strcmp(s->name, oInfo->refColName) == 0) {
1,226,161✔
3787
        vTableInfo->cid = s->colId;
358,108✔
3788
        break;
358,108✔
3789
      }
3790
    }
3791
    if (vTableInfo->cid == 0) {
358,108✔
3792
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
3793
              oInfo->refTableName);
3794
    }
3795
    tDecoderClear(&metaReader.coder);
358,108✔
3796
  }
3797

3798
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
133,213✔
3799

3800
end:
133,213✔
3801
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
133,213✔
3802
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReader);
133,213✔
3803
  STREAM_PRINT_LOG_END_WITHID(code, lino);
132,978✔
3804
  SRpcMsg rsp = {
132,978✔
3805
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3806
  tmsgSendRsp(&rsp);
133,213✔
3807
  return code;
133,213✔
3808
}
3809

3810
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
867,616✔
3811
  int32_t                   code = 0;
867,616✔
3812
  int32_t                   lino = 0;
867,616✔
3813
  void*                     buf = NULL;
867,616✔
3814
  size_t                    size = 0;
867,616✔
3815
  SSDataBlock* pBlock = NULL;
867,616✔
3816

3817
  SMetaReader               metaReader = {0};
867,616✔
3818
  SMetaReader               metaReaderStable = {0};
867,616✔
3819
  int64_t streamId = req->base.streamId;
867,616✔
3820
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
867,616✔
3821

3822
  SArray* cols = req->virTablePseudoColReq.cids;
867,616✔
3823
  STREAM_CHECK_NULL_GOTO(cols, terrno);
867,616✔
3824

3825
  sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
867,616✔
3826
  STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
867,616✔
3827

3828
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
867,616✔
3829

3830
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
867,616✔
3831
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
867,616✔
3832
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 || *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
2,084✔
3833
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
2,084✔
3834
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
2,084✔
3835
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
2,084✔
3836
    pBlock->info.rows = 1;
2,084✔
3837
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
2,084✔
3838
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
2,084✔
3839
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
2,084✔
3840
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
865,532✔
3841
    int64_t suid = metaReader.me.ctbEntry.suid;
865,532✔
3842
    sStreamReaderInfo->storageApi.metaReaderFn.readerReleaseLock(&metaReader);
865,532✔
3843
    sStreamReaderInfo->storageApi.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &sStreamReaderInfo->storageApi.metaFn);
865,532✔
3844

3845
    STREAM_CHECK_RET_GOTO(sStreamReaderInfo->storageApi.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
865,532✔
3846
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
865,532✔
3847
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
2,234,422✔
3848
      col_id_t* id = taosArrayGet(cols, i);
1,368,890✔
3849
      STREAM_CHECK_NULL_GOTO(id, terrno);
1,368,890✔
3850
      if (*id == -1) {
1,368,890✔
3851
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
856,364✔
3852
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
856,364✔
3853
        continue;
856,364✔
3854
      }
3855
      size_t j = 0;
512,526✔
3856
      for (; j < sSchemaWrapper->nCols; j++) {
986,950✔
3857
        SSchema* s = sSchemaWrapper->pSchema + j;
986,950✔
3858
        if (s->colId == *id) {
986,950✔
3859
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
512,526✔
3860
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
512,526✔
3861
          break;
512,526✔
3862
        }
3863
      }
3864
      if (j == sSchemaWrapper->nCols) {
512,526✔
3865
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
3866
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
3867
      }
3868
    }
3869
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
865,532✔
3870
    pBlock->info.rows = 1;
865,532✔
3871
    
3872
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
2,234,422✔
3873
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,368,890✔
3874
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
1,368,890✔
3875

3876
      if (pDst->info.colId == -1) {
1,368,890✔
3877
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
856,364✔
3878
        continue;
856,364✔
3879
      }
3880
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
512,526✔
3881
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
3882
        continue;
×
3883
      }
3884

3885
      STagVal val = {0};
512,526✔
3886
      val.cid = pDst->info.colId;
512,526✔
3887
      const char* p = sStreamReaderInfo->storageApi.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
512,526✔
3888

3889
      char* data = NULL;
512,526✔
3890
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
512,526✔
3891
        data = tTagValToData((const STagVal*)p, false);
512,526✔
3892
      } else {
3893
        data = (char*)p;
×
3894
      }
3895

3896
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
512,526✔
3897
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
3898

3899
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
512,526✔
3900
          (data != NULL)) {
3901
        taosMemoryFree(data);
468,064✔
3902
      }
3903
    }
3904
  } else {
3905
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
3906
    code = TSDB_CODE_INVALID_PARA;
×
3907
    goto end;
×
3908
  }
3909
  
3910
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
867,616✔
3911
  printDataBlock(pBlock, __func__, "", streamId);
867,616✔
3912
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
867,616✔
3913

3914
end:
867,616✔
3915
  if(size == 0){
867,616✔
3916
    code = TSDB_CODE_STREAM_NO_DATA;
×
3917
  }
3918
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReaderStable);
867,616✔
3919
  sStreamReaderInfo->storageApi.metaReaderFn.clearReader(&metaReader);
867,616✔
3920
  STREAM_PRINT_LOG_END(code, lino);
867,616✔
3921
  SRpcMsg rsp = {
867,616✔
3922
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3923
  tmsgSendRsp(&rsp);
867,616✔
3924
  blockDataDestroy(pBlock);
867,616✔
3925
  return code;
867,616✔
3926
}
3927

3928
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *pInfo) {
6,394,442✔
3929
  int32_t            code = 0;
6,394,442✔
3930
  int32_t            lino = 0;
6,394,442✔
3931
  void*              buf = NULL;
6,394,442✔
3932
  size_t             size = 0;
6,394,442✔
3933
  void*              taskAddr = NULL;
6,394,442✔
3934
  SArray*            pResList = NULL;
6,394,442✔
3935
  bool               hasNext = false;
6,394,442✔
3936

3937
  SResFetchReq req = {0};
6,394,442✔
3938
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
6,394,442✔
3939
                              TSDB_CODE_QRY_INVALID_INPUT);
3940
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
6,394,194✔
3941
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
6,394,442✔
3942

3943
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
6,394,442✔
3944
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
6,394,442✔
3945
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
6,394,442✔
3946
  sStreamReaderCalcInfo->rtInfo.execId = req.execId;
6,394,442✔
3947

3948
  void* pTask = sStreamReaderCalcInfo->pTask;
6,394,442✔
3949
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
6,394,442✔
3950
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
3951

3952
  if (req.reset) {
6,394,442✔
3953
    int64_t uid = 0;
4,163,427✔
3954
    if (req.dynTbname) {
4,163,427✔
3955
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
78,942✔
3956
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
78,942✔
3957
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
78,942✔
3958
        if (pValue != NULL && pValue->isTbname) {
78,942✔
3959
          uid = pValue->uid;
78,942✔
3960
          break;
78,942✔
3961
        }
3962
      }
3963
    }
3964
    
3965
    SReadHandle handle = {0};
4,163,427✔
3966
    handle.vnode = pVnode;
4,163,427✔
3967
    handle.pMsgCb = &pVnode->msgCb;
4,163,427✔
3968
    handle.pWorkerCb = pInfo->workerCb;
4,163,427✔
3969
    handle.uid = uid;
4,163,427✔
3970
    handle.cacheSttStatis = true;
4,163,427✔
3971

3972
    initStorageAPI(&handle.api);
4,163,427✔
3973
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
4,163,427✔
3974
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
3,391,553✔
3975
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
3,215,375✔
3976
      if (node != NULL) {
3,215,375✔
3977
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, false));
427,305✔
3978
      } else {
3979
        ST_TASK_DLOG("vgId:%d %s no scan time range node", TD_VID(pVnode), __func__);
2,788,070✔
3980
      }
3981

3982
      node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pExtTimeRange;
3,215,375✔
3983
      if (node != NULL) {
3,215,375✔
3984
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, true));
76,384✔
3985
      } else {
3986
        ST_TASK_DLOG("vgId:%d %s no interp time range node", TD_VID(pVnode), __func__);
3,138,991✔
3987
      }      
3988
    }
3989

3990
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
4,163,427✔
3991
    sStreamReaderCalcInfo->rtInfo.funcInfo.hasPlaceHolder = sStreamReaderCalcInfo->hasPlaceHolder;
4,163,427✔
3992
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
4,163,427✔
3993

3994
    if (sStreamReaderCalcInfo->pTaskInfo == NULL || !qNeedReset(sStreamReaderCalcInfo->pTaskInfo)) {
4,163,427✔
3995
      qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
604,546✔
3996
      STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
604,546✔
3997
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
3998
                                                    req.taskId));
3999
    } else {
4000
      STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, &handle));
3,558,881✔
4001
    }
4002

4003
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
4,162,680✔
4004
  }
4005

4006
  if (req.pOpParam != NULL) {
6,393,447✔
4007
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, (void*)req.pOpParam);
340,742✔
4008
  }
4009

4010
  pResList = taosArrayInit(4, POINTER_BYTES);
6,393,447✔
4011
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
6,394,210✔
4012
  uint64_t ts = 0;
6,394,210✔
4013
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
6,393,675✔
4014

4015
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
15,281,444✔
4016
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
8,888,394✔
4017
    if (pBlock == NULL) continue;
8,888,394✔
4018
    printDataBlock(pBlock, __func__, "fetch", ((SStreamTask*)pTask)->streamId);
8,888,394✔
4019
/*    
4020
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
4021
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
4022
      printDataBlock(pBlock, __func__, "fetch filter");
4023
    }
4024
*/    
4025
  }
4026

4027
end:
6,394,690✔
4028
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
6,394,442✔
4029
  taosArrayDestroy(pResList);
6,394,442✔
4030
  streamReleaseTask(taskAddr);
6,394,442✔
4031

4032
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST){
6,394,442✔
4033
    code = TDB_CODE_SUCCESS;
×
4034
  }
4035
  STREAM_PRINT_LOG_END(code, lino);
6,394,442✔
4036
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
6,394,442✔
4037
  tmsgSendRsp(&rsp);
6,394,442✔
4038
  tDestroySResFetchReq(&req);
6,393,454✔
4039
  if (TDB_CODE_SUCCESS != code) {
6,393,922✔
4040
    ST_TASK_ELOG("vgId:%d %s failed, code:%d - %s", TD_VID(pVnode), __func__,
×
4041
                 code, tstrerror(code));
4042
  }
4043
  return code;
6,393,922✔
4044
}
4045

4046
static int32_t initTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, SVnode* pVnode) {
33,673,202✔
4047
  int32_t code = 0;
33,673,202✔
4048
  if (sStreamReaderInfo->tableList.pTableList != NULL) {  
33,673,202✔
4049
    return code;
33,380,409✔
4050
  }
4051
  taosWLockLatch(&sStreamReaderInfo->lock);
293,520✔
4052
  sStreamReaderInfo->pVnode = pVnode;
293,520✔
4053
  initStorageAPI(&sStreamReaderInfo->storageApi);
293,520✔
4054
  if (sStreamReaderInfo->tableList.pTableList == NULL) {
293,069✔
4055
    code = initStreamTableListInfo(&sStreamReaderInfo->tableList);
293,069✔
4056
    if (code == 0) {
293,520✔
4057
      code = generateTablistForStreamReader(pVnode, sStreamReaderInfo);
293,520✔
4058
      if (code != 0) {
293,520✔
4059
        qStreamDestroyTableInfo(&sStreamReaderInfo->tableList);
×
4060
      } else {
4061
        sStreamReaderInfo->tableList.version = pVnode->state.applied;
293,520✔
4062
        stDebug("vgId:%d %s init table list for stream reader, table num:%zu, version:%" PRId64,
293,520✔
4063
                TD_VID(pVnode), __func__, taosArrayGetSize(sStreamReaderInfo->tableList.pTableList), sStreamReaderInfo->tableList.version);
4064
      }
4065
    }
4066
  }
4067
  taosWUnLockLatch(&sStreamReaderInfo->lock);
293,520✔
4068
  return code;
293,520✔
4069
}
4070

4071
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg, SQueueInfo *pInfo) {
40,183,787✔
4072
  int32_t                   code = 0;
40,183,787✔
4073
  int32_t                   lino = 0;
40,183,787✔
4074
  SSTriggerPullRequestUnion req = {0};
40,183,787✔
4075
  void*                     taskAddr = NULL;
40,184,680✔
4076
  bool                      sendRsp = false;
40,182,041✔
4077

4078
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
40,182,041✔
4079
  if (!syncIsReadyForRead(pVnode->sync)) {
40,184,038✔
4080
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
96,592✔
4081
    return 0;
96,407✔
4082
  }
4083

4084
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
40,088,115✔
4085
    return vnodeProcessStreamFetchMsg(pVnode, pMsg, pInfo);
6,394,442✔
4086
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
33,689,321✔
4087
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
33,693,673✔
4088
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
33,693,907✔
4089
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
33,693,003✔
4090
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64 ", applied:%" PRIx64,
33,690,525✔
4091
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId, pVnode->state.applied);
4092
    SStreamTriggerReaderInfo* sStreamReaderInfo = qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
33,691,973✔
4093
    STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
33,691,075✔
4094
    STREAM_CHECK_RET_GOTO(initTableList(sStreamReaderInfo, pVnode));
33,672,759✔
4095
    sendRsp = true;
33,673,927✔
4096
    switch (req.base.type) {
33,673,927✔
4097
      case STRIGGER_PULL_SET_TABLE:
133,213✔
4098
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo));
133,213✔
4099
        break;
133,213✔
4100
      case STRIGGER_PULL_LAST_TS:
285,104✔
4101
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
285,104✔
4102
        break;
285,104✔
4103
      case STRIGGER_PULL_FIRST_TS:
238,759✔
4104
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
238,759✔
4105
        break;
238,993✔
4106
      case STRIGGER_PULL_TSDB_META:
440,048✔
4107
      case STRIGGER_PULL_TSDB_META_NEXT:
4108
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo));
440,048✔
4109
        break;
440,048✔
4110
      case STRIGGER_PULL_TSDB_TS_DATA:
128,344✔
4111
        if (sStreamReaderInfo->isVtableStream) {
128,344✔
4112
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo));
×
4113
        } else {
4114
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo));
128,344✔
4115
        }
4116
        break;
128,344✔
4117
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
112,056✔
4118
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
4119
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
112,056✔
4120
        break;
56,145✔
4121
      case STRIGGER_PULL_TSDB_CALC_DATA:
6,204,452✔
4122
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
4123
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
6,204,452✔
4124
        break;
6,203,216✔
4125
      case STRIGGER_PULL_TSDB_DATA:
491,320✔
4126
      case STRIGGER_PULL_TSDB_DATA_NEXT:
4127
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
491,320✔
4128
        break;
489,028✔
4129
      case STRIGGER_PULL_GROUP_COL_VALUE:
282,087✔
4130
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo));
282,087✔
4131
        break;
282,087✔
4132
      case STRIGGER_PULL_VTABLE_INFO:
75,475✔
4133
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
75,475✔
4134
        break;
75,475✔
4135
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
867,616✔
4136
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
867,616✔
4137
        break;
867,616✔
4138
      case STRIGGER_PULL_OTABLE_INFO:
133,213✔
4139
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
133,213✔
4140
        break;
133,213✔
4141
      case STRIGGER_PULL_WAL_META_NEW:
11,992,302✔
4142
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
11,992,302✔
4143
        break;
11,990,681✔
4144
      case STRIGGER_PULL_WAL_DATA_NEW:
7,935,411✔
4145
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
7,935,411✔
4146
        break;
7,934,947✔
4147
      case STRIGGER_PULL_WAL_META_DATA_NEW:
3,367,769✔
4148
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
3,367,769✔
4149
        break;
3,366,603✔
4150
      case STRIGGER_PULL_WAL_CALC_DATA_NEW:
986,961✔
4151
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
986,961✔
4152
        break;
986,961✔
4153
      default:
3✔
4154
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
3✔
4155
        sendRsp = false;
490✔
4156
        STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
490✔
4157
    }
4158
  } else {
4159
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
232✔
4160
    STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
4161
  }
4162
end:
33,679,965✔
4163

4164
  streamReleaseTask(taskAddr);
33,688,615✔
4165

4166
  tDestroySTriggerPullRequest(&req);
33,688,647✔
4167
  STREAM_PRINT_LOG_END(code, lino);
33,682,482✔
4168
  if (!sendRsp) {
33,690,349✔
4169
    SRpcMsg rsp = {
36,632✔
4170
      .code = code,
4171
      .pCont = pMsg->info.rsp,
18,316✔
4172
      .contLen = pMsg->info.rspLen,
18,316✔
4173
      .info = pMsg->info,
4174
    };
4175
    tmsgSendRsp(&rsp);
18,316✔
4176
  }
4177
  return code;
33,690,349✔
4178
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc