• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4822

27 Oct 2025 05:42AM UTC coverage: 59.732% (+1.0%) from 58.728%
#4822

push

travis-ci

web-flow
Merge pull request #33377 from taosdata/fix/main/rename-udf-path

fix: update UDF example links to correct file paths

121214 of 258518 branches covered (46.89%)

Branch coverage included in aggregate %.

193636 of 268583 relevant lines covered (72.1%)

4002399.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.48
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdbool.h>
17
#include <stdint.h>
18
#include "nodes.h"
19
#include "osMemPool.h"
20
#include "osMemory.h"
21
#include "scalar.h"
22
#include "streamReader.h"
23
#include "taosdef.h"
24
#include "tarray.h"
25
#include "tcommon.h"
26
#include "tdatablock.h"
27
#include "tdb.h"
28
#include "tdef.h"
29
#include "tencode.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "tlist.h"
33
#include "tmsg.h"
34
#include "tsimplehash.h"
35
#include "vnd.h"
36
#include "vnode.h"
37
#include "vnodeInt.h"
38

39
#define BUILD_OPTION(options, sStreamReaderInfo, _ver, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
40
                     _gid, _initReader, _mapInfo)                                                                        \
41
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_mapInfo == NULL ? sStreamReaderInfo->suid : 0),              \
42
                                                  .ver = _ver,                                                     \
43
                                                  .order = _order,                                                 \
44
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
45
                                                  .sStreamReaderInfo = sStreamReaderInfo,                     \
46
                                                  .schemas = _schemas,                                             \
47
                                                  .isSchema = _isSchema,                                           \
48
                                                  .scanMode = _scanMode,                                           \
49
                                                  .gid = _gid,                                                     \
50
                                                  .initReader = _initReader,                                       \
51
                                                  .mapInfo = _mapInfo};
52

53
typedef struct WalMetaResult {
54
  uint64_t    id;
55
  int64_t     skey;
56
  int64_t     ekey;
57
} WalMetaResult;
58

59
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
30,055✔
60

61
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
62

63
int32_t sortCid(const void *lp, const void *rp) {
1,537✔
64
  int16_t* c1 = (int16_t*)lp;
1,537✔
65
  int16_t* c2 = (int16_t*)rp;
1,537✔
66

67
  if (*c1 < *c2) {
1,537✔
68
    return -1;
1,532✔
69
  } else if (*c1 > *c2) {
5!
70
    return 1;
5✔
71
  }
72

73
  return 0;
×
74
}
75

76
int32_t sortSSchema(const void *lp, const void *rp) {
1,533✔
77
  SSchema* c1 = (SSchema*)lp;
1,533✔
78
  SSchema* c2 = (SSchema*)rp;
1,533✔
79

80
  if (c1->colId < c2->colId) {
1,533✔
81
    return -1;
1,528✔
82
  } else if (c1->colId > c2->colId) {
5!
83
    return 1;
5✔
84
  }
85

86
  return 0;
×
87
}
88

89
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
63,440✔
90
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
63,440✔
91
  if (pSrc == NULL) {
63,453✔
92
    return terrno;
9✔
93
  }
94

95
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
63,444✔
96
  return 0;
63,444✔
97
}
98

99
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
38,162✔
100
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
38,162✔
101
  if (code != TSDB_CODE_SUCCESS) {
38,156!
102
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
103
  }
104

105
  return code;
38,157✔
106
}
107

108
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
3,773✔
109
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
3,773✔
110
}
111

112
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
196✔
113
  int32_t code = 0;
196✔
114
  int32_t lino = 0;
196✔
115
  void*   buf = NULL;
196✔
116
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
196✔
117
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
196!
118
  buf = rpcMallocCont(len);
196✔
119
  STREAM_CHECK_NULL_GOTO(buf, terrno);
196!
120
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
196✔
121
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
196!
122
  *data = buf;
196✔
123
  *size = len;
196✔
124
  buf = NULL;
196✔
125
end:
196✔
126
  rpcFreeCont(buf);
196✔
127
  return code;
196✔
128
}
129

130
static bool needRefreshTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int8_t tableType, int64_t suid, int64_t uid, bool isCalc){
7,255✔
131
  if (sStreamReaderInfo->isVtableStream) {
7,255✔
132
    int64_t id[2] = {suid, uid};
5,764✔
133
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id)) == NULL) {
5,764!
134
      return true;
5,757✔
135
    }
136
  } else {
137
    if (tableType != TD_CHILD_TABLE) {
1,491✔
138
      return false;
521✔
139
    }
140
    if (sStreamReaderInfo->tableType == TD_SUPER_TABLE && 
970✔
141
        suid == sStreamReaderInfo->suid && 
596✔
142
        qStreamGetGroupId(sStreamReaderInfo->tableList, uid) == -1) {
23✔
143
      return true;
9✔
144
    }
145
  }
146
  return false;
969✔
147
}
148

149
static bool uidInTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id, bool isCalc){
87,176✔
150
  if (sStreamReaderInfo->isVtableStream) {
87,176✔
151
    int64_t tmp[2] = {suid, uid};
62,429✔
152
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, tmp, sizeof(tmp)) == NULL) {
62,429✔
153
      return false;
31,005✔
154
    }
155
    *id = uid;
31,414✔
156
  } else {
157
    if (sStreamReaderInfo->tableList == NULL) return false;
24,747!
158

159
    if (sStreamReaderInfo->tableType == TD_SUPER_TABLE) {
24,747✔
160
      if (suid != sStreamReaderInfo->suid) return false;
16,640✔
161
      if (sStreamReaderInfo->pTagCond == NULL) {
12,993✔
162
        if (sStreamReaderInfo->partitionCols == NULL){
11,474✔
163
          *id = 0;
40✔
164
        } else if (sStreamReaderInfo->groupByTbname){
11,434✔
165
          *id= uid;
10,732✔
166
        } else {
167
          *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
702✔
168
          if (*id == -1) return false;
704!
169
        }
170
      } else {
171
        //*id= uid;
172
        *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
1,519✔
173
        if (*id == -1) return false;
1,670✔
174
      }
175
    } else {
176
      *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
8,107✔
177
      if(*id == -1) *id = uid;
8,507✔
178
      return uid == sStreamReaderInfo->uid;
8,507✔
179
    }
180
  }
181
  return true;
43,861✔
182
}
183

184
static int32_t generateTablistForStreamReader(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, bool isHistory) {
12,696✔
185
  int32_t                   code = 0;
12,696✔
186
  int32_t                   lino = 0;
12,696✔
187
  SNodeList* groupNew = NULL;                                      
12,696✔
188
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
12,696!
189

190
  SStorageAPI api = {0};
12,701✔
191
  initStorageAPI(&api);
12,701✔
192
  code = qStreamCreateTableListForReader(pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType, groupNew,
12,700✔
193
                                         true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api, 
194
                                         isHistory ? &sStreamReaderInfo->historyTableList : &sStreamReaderInfo->tableList,
195
                                         isHistory ? NULL : sStreamReaderInfo->groupIdMap);
196
  end:
12,702✔
197
  nodesDestroyList(groupNew);
12,702✔
198
  STREAM_PRINT_LOG_END(code, lino);
12,702!
199
  return code;
12,702✔
200
}
201

202
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
733✔
203
  int32_t code = 0;
733✔
204
  int32_t lino = 0;
733✔
205
  void*   buf = NULL;
733✔
206
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
733✔
207
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
733!
208
  buf = rpcMallocCont(len);
733✔
209
  STREAM_CHECK_NULL_GOTO(buf, terrno);
733!
210
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
733✔
211
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
733!
212
  *data = buf;
733✔
213
  *size = len;
733✔
214
  buf = NULL;
733✔
215
end:
733✔
216
  rpcFreeCont(buf);
733✔
217
  return code;
733✔
218
}
219

220
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
1,036✔
221
  int32_t code = 0;
1,036✔
222
  int32_t lino = 0;
1,036✔
223
  void*   buf = NULL;
1,036✔
224
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
1,036✔
225
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
1,037!
226
  buf = rpcMallocCont(len);
1,037✔
227
  STREAM_CHECK_NULL_GOTO(buf, terrno);
1,036!
228
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
1,036✔
229
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
1,037!
230
  *data = buf;
1,037✔
231
  *size = len;
1,037✔
232
  buf = NULL;
1,037✔
233
end:
1,037✔
234
  rpcFreeCont(buf);
1,037✔
235
  return code;
1,036✔
236
}
237

238

239
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
31,972✔
240
  int32_t code = 0;
31,972✔
241
  int32_t lino = 0;
31,972✔
242
  void*   buf = NULL;
31,972✔
243
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
31,972!
244
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
3,916✔
245
  buf = rpcMallocCont(dataEncodeSize);
3,917✔
246
  STREAM_CHECK_NULL_GOTO(buf, terrno);
3,916!
247
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
3,916✔
248
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
3,916!
249
  *data = buf;
3,916✔
250
  *size = dataEncodeSize;
3,916✔
251
  buf = NULL;
3,916✔
252
end:
31,972✔
253
  rpcFreeCont(buf);
31,972✔
254
  return code;
31,972✔
255
}
256

257
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
2,518✔
258
  int32_t        pNum = 1;
2,518✔
259
  STableKeyInfo* pList = NULL;
2,518✔
260
  int32_t        code = 0;
2,518✔
261
  int32_t        lino = 0;
2,518✔
262
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
2,518!
263
  if (pList == NULL || pNum == 0) {
2,518!
264
    code = TSDB_CODE_INVALID_PARA;
×
265
    goto end;
×
266
  }
267
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
2,518!
268

269
  cleanupQueryTableDataCond(&pTask->cond);
2,518✔
270
  uint64_t suid = pTask->options.sStreamReaderInfo->isVtableStream ? pList->groupId : pTask->options.suid;
2,518✔
271
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
2,518!
272
                                                      pTask->options.twindows, suid, pTask->options.ver, NULL));
273
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
2,518!
274

275
end:
2,518✔
276
  STREAM_PRINT_LOG_END(code, lino);
2,518!
277
  return code;
2,518✔
278
}
279

280
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t id, bool isVTable, int64_t uid,
×
281
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
282
  int32_t code = 0;
×
283
  int32_t lino = 0;
×
284
  int32_t index = 0;
×
285
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
×
286
  if (!isVTable) {
×
287
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
×
288
  }
289
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
×
290
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
×
291
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
×
292
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
×
293
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
×
294

295
end:
×
296
  // STREAM_PRINT_LOG_END(code, lino)
297
  return code;
×
298
}
299

300
static int32_t buildWalMetaBlockNew(SSDataBlock* pBlock, int64_t id, int64_t skey, int64_t ekey, int64_t ver) {
15,514✔
301
  int32_t code = 0;
15,514✔
302
  int32_t lino = 0;
15,514✔
303
  int32_t index = 0;
15,514✔
304
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
15,514!
305
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
15,513!
306
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
15,492!
307
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
15,483!
308

309
end:
15,483✔
310
  return code;
15,483✔
311
}
312

313
static int32_t buildDropTableBlock(SSDataBlock* pBlock, int64_t id, int64_t ver) {
1✔
314
  int32_t code = 0;
1✔
315
  int32_t lino = 0;
1✔
316
  int32_t index = 0;
1✔
317
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
1!
318
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
1!
319

320
end:
1✔
321
  return code;
1✔
322
}
323

324
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
×
325
  pTSchema->numOfCols = 1;
×
326
  pTSchema->version = ver;
×
327
  pTSchema->columns[0].colId = colId;
×
328
  pTSchema->columns[0].type = type;
×
329
  pTSchema->columns[0].bytes = bytes;
×
330
}
×
331

332
static int32_t scanDeleteDataNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
115✔
333
                              int64_t ver) {
334
  int32_t    code = 0;
115✔
335
  int32_t    lino = 0;
115✔
336
  SDecoder   decoder = {0};
115✔
337
  SDeleteRes req = {0};
115✔
338
  void* pTask = sStreamReaderInfo->pTask;
115✔
339

340
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
115✔
341
  tDecoderInit(&decoder, data, len);
115✔
342
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
115!
343
  STREAM_CHECK_CONDITION_GOTO((sStreamReaderInfo->tableType == TSDB_SUPER_TABLE && !sStreamReaderInfo->isVtableStream && req.suid != sStreamReaderInfo->suid), TDB_CODE_SUCCESS);
115✔
344
  
345
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
174✔
346
    uint64_t* uid = taosArrayGet(req.uidList, i);
100✔
347
    STREAM_CHECK_NULL_GOTO(uid, terrno);
126!
348
    uint64_t   id = 0;
100✔
349
    ST_TASK_ILOG("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, *uid, req.skey, req.ekey);
100!
350
    STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, req.suid, *uid, &id, false), TDB_CODE_SUCCESS);
100✔
351
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->deleteBlock, ((SSDataBlock*)rsp->deleteBlock)->info.rows + 1));
74!
352
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->deleteBlock, id, req.skey, req.ekey, ver));
74!
353
    ((SSDataBlock*)rsp->deleteBlock)->info.rows++;
74✔
354
    rsp->totalRows++;
74✔
355
  }
356

357
end:
74✔
358
  taosArrayDestroy(req.uidList);
115✔
359
  tDecoderClear(&decoder);
115✔
360
  return code;
115✔
361
}
362

363
static int32_t scanDropTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
1✔
364
                             int64_t ver) {
365
  int32_t  code = 0;
1✔
366
  int32_t  lino = 0;
1✔
367
  SDecoder decoder = {0};
1✔
368
  void* pTask = sStreamReaderInfo->pTask;
1✔
369

370
  SVDropTbBatchReq req = {0};
1✔
371
  tDecoderInit(&decoder, data, len);
1✔
372
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
1!
373

374
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
2✔
375
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
1✔
376
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
1!
377
    uint64_t id = 0;
1✔
378
    if(!uidInTableList(sStreamReaderInfo, pDropTbReq->suid, pDropTbReq->uid, &id, false)) {
1!
379
      continue;
×
380
    }
381

382
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dropBlock, ((SSDataBlock*)rsp->dropBlock)->info.rows + 1));
1!
383
    STREAM_CHECK_RET_GOTO(buildDropTableBlock(rsp->dropBlock, id, ver));
1!
384
    ((SSDataBlock*)rsp->dropBlock)->info.rows++;
1✔
385
    rsp->totalRows++;
1✔
386
    ST_TASK_ILOG("stream reader scan drop uid %" PRId64 ", id %" PRIu64, pDropTbReq->uid, id);
1!
387
  }
388

389
end:
1✔
390
  tDecoderClear(&decoder);
1✔
391
  return code;
1✔
392
}
393

394
static int32_t reloadTableList(SStreamTriggerReaderInfo* sStreamReaderInfo){
5,787✔
395
  (void)taosThreadMutexLock(&sStreamReaderInfo->mutex);
5,787✔
396
  qStreamDestroyTableList(sStreamReaderInfo->tableList);
5,787✔
397
  sStreamReaderInfo->tableList = NULL;
5,787✔
398
  int32_t code = generateTablistForStreamReader(sStreamReaderInfo->pVnode, sStreamReaderInfo, false);
5,787✔
399
  if (code == 0){
5,787!
400
    qStreamDestroyTableList(sStreamReaderInfo->historyTableList);
5,787✔
401
    sStreamReaderInfo->historyTableList = NULL;
5,786✔
402
    code = generateTablistForStreamReader(sStreamReaderInfo->pVnode, sStreamReaderInfo, true);
5,786✔
403
  }
404
  (void)taosThreadMutexUnlock(&sStreamReaderInfo->mutex);
5,787✔
405
  return code;
5,787✔
406
}
407

408
static int32_t scanCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
102✔
409
  int32_t  code = 0;
102✔
410
  int32_t  lino = 0;
102✔
411
  SDecoder decoder = {0};
102✔
412
  void* pTask = sStreamReaderInfo->pTask;
102✔
413

414
  SVCreateTbBatchReq req = {0};
102✔
415
  tDecoderInit(&decoder, data, len);
102✔
416
  
417
  STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbBatchReq(&decoder, &req));
102!
418

419
  bool found = false;
102✔
420
  SVCreateTbReq* pCreateReq = NULL;
102✔
421
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
155✔
422
    pCreateReq = req.pReqs + iReq;
102✔
423
    if (!needRefreshTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
102✔
424
      ST_TASK_ILOG("stream reader scan create table jump, %s", pCreateReq->name);
53!
425
      continue;
53✔
426
    }
427
    ST_TASK_ILOG("stream reader scan create table %s", pCreateReq->name);
49!
428

429
    found = true;
49✔
430
    break;
49✔
431
  }
432
  STREAM_CHECK_CONDITION_GOTO(!found, TDB_CODE_SUCCESS);
102✔
433

434
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
49!
435
end:
49✔
436
  tDeleteSVCreateTbBatchReq(&req);
102✔
437
  tDecoderClear(&decoder);
102✔
438
  return code;
102✔
439
}
440

441
static int32_t processAutoCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SVCreateTbReq* pCreateReq) {
7,154✔
442
  int32_t  code = 0;
7,154✔
443
  int32_t  lino = 0;
7,154✔
444
  void*    pTask = sStreamReaderInfo->pTask;
7,154✔
445
  if (!needRefreshTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
7,154✔
446
    ST_TASK_DLOG("stream reader scan auto create table jump, %s", pCreateReq->name);
1,440✔
447
    goto end;
1,439✔
448
  }
449
  ST_TASK_ILOG("stream reader scan auto create table %s", pCreateReq->name);
5,717!
450

451
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
5,717!
452
end:
5,717✔
453
  return code;
7,156✔
454
}
455

456
static int32_t scanAlterTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
125✔
457
  int32_t  code = 0;
125✔
458
  int32_t  lino = 0;
125✔
459
  SDecoder decoder = {0};
125✔
460
  void* pTask = sStreamReaderInfo->pTask;
125✔
461

462
  SVAlterTbReq req = {0};
125✔
463
  tDecoderInit(&decoder, data, len);
125✔
464
  
465
  STREAM_CHECK_RET_GOTO(tDecodeSVAlterTbReq(&decoder, &req));
125!
466
  STREAM_CHECK_CONDITION_GOTO(req.action != TSDB_ALTER_TABLE_UPDATE_TAG_VAL && req.action != TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL, TDB_CODE_SUCCESS);
122!
467

468
  ETableType tbType = 0;
35✔
469
  uint64_t suid = 0;
35✔
470
  STREAM_CHECK_RET_GOTO(metaGetTableTypeSuidByName(sStreamReaderInfo->pVnode, req.tbName, &tbType, &suid));
35!
471
  STREAM_CHECK_CONDITION_GOTO(tbType != TSDB_CHILD_TABLE, TDB_CODE_SUCCESS);
35!
472
  STREAM_CHECK_CONDITION_GOTO(suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
35✔
473

474
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
21!
475
  ST_TASK_ILOG("stream reader scan alter table %s", req.tbName);
21!
476

477
end:
×
478
  taosArrayDestroy(req.pMultiTag);
122✔
479
  tDecoderClear(&decoder);
122✔
480
  return code;
125✔
481
}
482

483
// static int32_t scanAlterSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
484
//   int32_t  code = 0;
485
//   int32_t  lino = 0;
486
//   SDecoder decoder = {0};
487
//   SMAlterStbReq reqAlter = {0};
488
//   SVCreateStbReq req = {0};
489
//   tDecoderInit(&decoder, data, len);
490
//   void* pTask = sStreamReaderInfo->pTask;
491
  
492
//   STREAM_CHECK_RET_GOTO(tDecodeSVCreateStbReq(&decoder, &req));
493
//   STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
494
//   if (req.alterOriData != 0) {
495
//     STREAM_CHECK_RET_GOTO(tDeserializeSMAlterStbReq(req.alterOriData, req.alterOriDataLen, &reqAlter));
496
//     STREAM_CHECK_CONDITION_GOTO(reqAlter.alterType != TSDB_ALTER_TABLE_DROP_TAG && reqAlter.alterType != TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TDB_CODE_SUCCESS);
497
//   }
498
  
499
//   STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
500

501
//   ST_TASK_ILOG("stream reader scan alter suid %" PRId64, req.suid);
502
// end:
503
//   tFreeSMAltertbReq(&reqAlter);
504
//   tDecoderClear(&decoder);
505
//   return code;
506
// }
507

508
static int32_t scanDropSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
×
509
  int32_t  code = 0;
×
510
  int32_t  lino = 0;
×
511
  SDecoder decoder = {0};
×
512
  void* pTask = sStreamReaderInfo->pTask;
×
513

514
  SVDropStbReq req = {0};
×
515
  tDecoderInit(&decoder, data, len);
×
516
  STREAM_CHECK_RET_GOTO(tDecodeSVDropStbReq(&decoder, &req));
×
517
  STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
×
518
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
×
519

520
  ST_TASK_ILOG("stream reader scan drop suid %" PRId64, req.suid);
×
521
end:
×
522
  tDecoderClear(&decoder);
×
523
  return code;
×
524
}
525

526
static int32_t scanSubmitTbDataForMeta(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* gidHash) {
43,583✔
527
  int32_t code = 0;
43,583✔
528
  int32_t lino = 0;
43,583✔
529
  WalMetaResult walMeta = {0};
43,583✔
530
  SSubmitTbData submitTbData = {0};
43,583✔
531
  
532
  if (tStartDecode(pCoder) < 0) {
43,583!
533
    code = TSDB_CODE_INVALID_MSG;
×
534
    TSDB_CHECK_CODE(code, lino, end);
×
535
  }
536

537
  uint8_t       version = 0;
43,619✔
538
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
43,586!
539
    code = TSDB_CODE_INVALID_MSG;
×
540
    TSDB_CHECK_CODE(code, lino, end);
×
541
  }
542
  version = (submitTbData.flags >> 8) & 0xff;
43,586✔
543
  submitTbData.flags = submitTbData.flags & 0xff;
43,586✔
544

545
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
546
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
43,586✔
547
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
5,926!
548
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
5,927!
549
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
5,927!
550
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq));
5,925!
551
  }
552

553
  // submit data
554
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
43,586!
555
    code = TSDB_CODE_INVALID_MSG;
×
556
    TSDB_CHECK_CODE(code, lino, end);
×
557
  }
558
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
43,569!
559
    code = TSDB_CODE_INVALID_MSG;
×
560
    TSDB_CHECK_CODE(code, lino, end);
×
561
  }
562

563
  if (!uidInTableList(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &walMeta.id, false)){
43,569✔
564
    goto end;
32,175✔
565
  }
566
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
11,436!
567
    code = TSDB_CODE_INVALID_MSG;
×
568
    TSDB_CHECK_CODE(code, lino, end);
×
569
  }
570

571
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
11,436!
572
    uint64_t nColData = 0;
×
573
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
574
      code = TSDB_CODE_INVALID_MSG;
×
575
      TSDB_CHECK_CODE(code, lino, end);
×
576
    }
577

578
    SColData colData = {0};
×
579
    code = tDecodeColData(version, pCoder, &colData, false);
×
580
    if (code) {
×
581
      code = TSDB_CODE_INVALID_MSG;
×
582
      TSDB_CHECK_CODE(code, lino, end);
×
583
    }
584

585
    if (colData.flag != HAS_VALUE) {
×
586
      code = TSDB_CODE_INVALID_MSG;
×
587
      TSDB_CHECK_CODE(code, lino, end);
×
588
    }
589
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
590
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
591

592
    for (uint64_t i = 1; i < nColData; i++) {
×
593
      code = tDecodeColData(version, pCoder, &colData, true);
×
594
      if (code) {
×
595
        code = TSDB_CODE_INVALID_MSG;
×
596
        TSDB_CHECK_CODE(code, lino, end);
×
597
      }
598
    }
599
  } else {
600
    uint64_t nRow = 0;
11,436✔
601
    if (tDecodeU64v(pCoder, &nRow) < 0) {
11,436!
602
      code = TSDB_CODE_INVALID_MSG;
×
603
      TSDB_CHECK_CODE(code, lino, end);
×
604
    }
605

606
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
28,023✔
607
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
16,587✔
608
      pCoder->pos += pRow->len;
16,587✔
609
      if (iRow == 0){
16,587✔
610
#ifndef NO_UNALIGNED_ACCESS
611
        walMeta.skey = pRow->ts;
11,435✔
612
#else
613
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
614
#endif
615
      }
616
      if (iRow == nRow - 1) {
16,587✔
617
#ifndef NO_UNALIGNED_ACCESS
618
        walMeta.ekey = pRow->ts;
11,439✔
619
#else
620
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
621
#endif
622
      }
623
    }
624
  }
625

626
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
11,436✔
627
  if (data != NULL) {
11,428!
628
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
×
629
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
×
630
  } else {
631
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
11,428!
632
  }
633

634
end:
11,424✔
635
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
43,599✔
636
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
43,563!
637
  tEndDecode(pCoder);
43,564✔
638
  return code;
43,556✔
639
}
640

641
static int32_t scanSubmitDataForMeta(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len, int64_t ver) {
43,609✔
642
  int32_t  code = 0;
43,609✔
643
  int32_t  lino = 0;
43,609✔
644
  SDecoder decoder = {0};
43,609✔
645
  SSHashObj* gidHash = NULL;
43,609✔
646
  void* pTask = sStreamReaderInfo->pTask;
43,609✔
647

648
  tDecoderInit(&decoder, data, len);
43,609✔
649
  if (tStartDecode(&decoder) < 0) {
43,585!
650
    code = TSDB_CODE_INVALID_MSG;
×
651
    TSDB_CHECK_CODE(code, lino, end);
×
652
  }
653

654
  uint64_t nSubmitTbData = 0;
43,604✔
655
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
43,587!
656
    code = TSDB_CODE_INVALID_MSG;
×
657
    TSDB_CHECK_CODE(code, lino, end);
×
658
  }
659

660
  gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
43,587✔
661
  STREAM_CHECK_NULL_GOTO(gidHash, terrno);
43,600!
662

663
  for (int32_t i = 0; i < nSubmitTbData; i++) {
87,149✔
664
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataForMeta(&decoder, sStreamReaderInfo, gidHash));
43,595!
665
  }
666
  tEndDecode(&decoder);
43,554✔
667

668
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
43,527!
669
  int32_t iter = 0;
43,521✔
670
  void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
43,521✔
671
  while (px != NULL) {
54,996✔
672
    WalMetaResult* pMeta = (WalMetaResult*)px;
11,428✔
673
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
11,428!
674
    ((SSDataBlock*)rsp->metaBlock)->info.rows++;
11,431✔
675
    rsp->totalRows++;
11,431✔
676
    ST_TASK_DLOG("stream reader scan submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
11,431✔
677
          ", ver:%"PRId64, pMeta->skey, pMeta->ekey, pMeta->id, ver);
678
    px = tSimpleHashIterate(gidHash, px, &iter);
11,431✔
679
  }
680
end:
43,568✔
681
  tDecoderClear(&decoder);
43,568✔
682
  tSimpleHashCleanup( gidHash);
43,603✔
683
  return code;
43,604✔
684
}
685

686
static int32_t createBlockForTsdbMeta(SSDataBlock** pBlock, bool isVTable) {
960✔
687
  int32_t code = 0;
960✔
688
  int32_t lino = 0;
960✔
689
  SArray* schemas = taosArrayInit(8, sizeof(SSchema));
960✔
690
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
960!
691

692
  int32_t index = 1;
960✔
693
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
960!
694
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
960!
695
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
960!
696
  if (!isVTable) {
960✔
697
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
161!
698
  }
699
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
960!
700

701
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
960!
702

703
end:
960✔
704
  taosArrayDestroy(schemas);
960✔
705
  return code;
960✔
706
}
707

708
static int32_t createBlockForWalMetaNew(SSDataBlock** pBlock) {
605✔
709
  int32_t code = 0;
605✔
710
  int32_t lino = 0;
605✔
711
  SArray* schemas = NULL;
605✔
712

713
  schemas = taosArrayInit(8, sizeof(SSchema));
605✔
714
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
607!
715

716
  int32_t index = 0;
607✔
717
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
607!
718
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
606!
719
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
604!
720
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
604!
721

722
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
604!
723

724
end:
607✔
725
  taosArrayDestroy(schemas);
607✔
726
  return code;
607✔
727
}
728

729
static int32_t createBlockForDropTable(SSDataBlock** pBlock) {
1✔
730
  int32_t code = 0;
1✔
731
  int32_t lino = 0;
1✔
732
  SArray* schemas = NULL;
1✔
733

734
  schemas = taosArrayInit(8, sizeof(SSchema));
1✔
735
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1!
736

737
  int32_t index = 0;
1✔
738
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
1!
739
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
1!
740

741
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
1!
742

743
end:
1✔
744
  taosArrayDestroy(schemas);
1✔
745
  return code;
1✔
746
}
747

748
static int32_t processMeta(int16_t msgType, SStreamTriggerReaderInfo* sStreamReaderInfo, void *data, int32_t len, SSTriggerWalNewRsp* rsp, int32_t ver) {
975✔
749
  int32_t code = 0;
975✔
750
  int32_t lino = 0;
975✔
751
  SDecoder dcoder = {0};
975✔
752
  tDecoderInit(&dcoder, data, len);
975✔
753
  if (msgType == TDMT_VND_DELETE && sStreamReaderInfo->deleteReCalc != 0) {
975✔
754
    if (rsp->deleteBlock == NULL) {
115✔
755
      STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&rsp->deleteBlock));
43!
756
    }
757
      
758
    STREAM_CHECK_RET_GOTO(scanDeleteDataNew(sStreamReaderInfo, rsp, data, len, ver));
115!
759
  } else if (msgType == TDMT_VND_DROP_TABLE && sStreamReaderInfo->deleteOutTbl != 0) {
860✔
760
    if (rsp->dropBlock == NULL) {
1!
761
      STREAM_CHECK_RET_GOTO(createBlockForDropTable((SSDataBlock**)&rsp->dropBlock));
1!
762
    }
763
    STREAM_CHECK_RET_GOTO(scanDropTableNew(sStreamReaderInfo, rsp, data, len, ver));
1!
764
  } else if (msgType == TDMT_VND_DROP_STB) {
859!
765
    STREAM_CHECK_RET_GOTO(scanDropSTableNew(sStreamReaderInfo, data, len));
×
766
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
859✔
767
    STREAM_CHECK_RET_GOTO(scanCreateTableNew(sStreamReaderInfo, data, len));
102!
768
  } else if (msgType == TDMT_VND_ALTER_STB) {
757✔
769
    // STREAM_CHECK_RET_GOTO(scanAlterSTableNew(sStreamReaderInfo, data, len));
770
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
593✔
771
    STREAM_CHECK_RET_GOTO(scanAlterTableNew(sStreamReaderInfo, data, len));
124!
772
  }
773

774
  end:
594✔
775
  tDecoderClear(&dcoder);
976✔
776
  return code;
977✔
777
}
778
static int32_t processWalVerMetaNew(SVnode* pVnode, SSTriggerWalNewRsp* rsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
15,263✔
779
                       int64_t ctime) {
780
  int32_t code = 0;
15,263✔
781
  int32_t lino = 0;
15,263✔
782
  void* pTask = sStreamReaderInfo->pTask;
15,263✔
783

784
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
15,263✔
785
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
15,281!
786
  code = walReaderSeekVer(pWalReader, rsp->ver);
15,281✔
787
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
15,270✔
788
    if (rsp->ver < walGetFirstVer(pWalReader->pWal)) {
12,189!
789
      rsp->ver = walGetFirstVer(pWalReader->pWal);
×
790
    }
791
    ST_TASK_DLOG("vgId:%d %s scan wal error:%s", TD_VID(pVnode), __func__, tstrerror(code));
12,188✔
792
    code = TSDB_CODE_SUCCESS;
12,191✔
793
    goto end;
12,191✔
794
  }
795
  STREAM_CHECK_RET_GOTO(code);
3,081!
796

797
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, STREAM_RETURN_ROWS_NUM));
3,081!
798
  while (1) {
44,199✔
799
    code = walNextValidMsg(pWalReader, true);
47,280✔
800
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){\
47,280✔
801
      ST_TASK_DLOG("vgId:%d %s scan wal error:%s", TD_VID(pVnode), __func__, tstrerror(code));
3,079✔
802
      code = TSDB_CODE_SUCCESS;
3,084✔
803
      goto end;
3,084✔
804
    }
805
    STREAM_CHECK_RET_GOTO(code);
44,201!
806
    rsp->ver = pWalReader->curVersion;
44,201✔
807
    SWalCont* wCont = &pWalReader->pHead->head;
44,201✔
808
    rsp->verTime = wCont->ingestTs;
44,201✔
809
    if (wCont->ingestTs / 1000 > ctime) break;
44,201!
810
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
44,201✔
811
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
44,201✔
812
    int64_t ver = wCont->version;
44,201✔
813

814
    ST_TASK_DLOG("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
44,201✔
815
      TD_VID(pVnode), ver, wCont->msgType, sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
816
    if (wCont->msgType == TDMT_VND_SUBMIT) {
44,196✔
817
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
43,602✔
818
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
43,602✔
819
      STREAM_CHECK_RET_GOTO(scanSubmitDataForMeta(sStreamReaderInfo, rsp, data, len, ver));
43,602!
820
    } else {
821
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, rsp, ver));
594!
822
    }
823

824
    if (rsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
44,199!
825
      break;
×
826
    }
827
  }
828

829
end:
15,275✔
830
  walCloseReader(pWalReader);
15,275✔
831
  return code;
15,281✔
832
}
833

834
static int32_t processTag(SVnode* pVnode, SStreamTriggerReaderInfo* info, bool isCalc, SStorageAPI* api, 
7,806✔
835
  uint64_t uid, SSDataBlock* pBlock, uint32_t currentRow, uint32_t numOfRows, uint32_t numOfBlocks) {
836
  int32_t     code = 0;
7,806✔
837
  int32_t     lino = 0;
7,806✔
838
  SMetaReader mr = {0};
7,806✔
839
  SArray* tagCache = NULL;
7,806✔
840

841
  SHashObj* metaCache = isCalc ? info->pTableMetaCacheCalc : info->pTableMetaCacheTrigger;
7,806!
842
  SExprInfo*   pExprInfo = isCalc ? info->pExprInfoCalcTag : info->pExprInfoTriggerTag; 
7,806!
843
  int32_t      numOfExpr = isCalc ? info->numOfExprCalcTag : info->numOfExprTriggerTag;
7,806!
844
  if (numOfExpr == 0) {
7,806!
845
    return TSDB_CODE_SUCCESS;
×
846
  }
847

848
  void* uidData = taosHashGet(metaCache, &uid, LONG_BYTES);
7,806✔
849
  if (uidData == NULL) {
7,836✔
850
    api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
320✔
851
    code = api->metaReaderFn.getEntryGetUidCache(&mr, uid);
320✔
852
    api->metaReaderFn.readerReleaseLock(&mr);
318✔
853
    STREAM_CHECK_RET_GOTO(code);
320!
854

855
    tagCache = taosArrayInit(numOfExpr, POINTER_BYTES);
320✔
856
    STREAM_CHECK_NULL_GOTO(tagCache, terrno);
320!
857
    if(taosHashPut(metaCache, &uid, LONG_BYTES, &tagCache, POINTER_BYTES) != 0) {
320✔
858
      taosArrayDestroyP(tagCache, taosMemFree);
4✔
859
      code = terrno;
×
860
      goto end;
×
861
    }
862
  } else {
863
    tagCache = *(SArray**)uidData;
7,516✔
864
    STREAM_CHECK_CONDITION_GOTO(taosArrayGetSize(tagCache) != numOfExpr, TSDB_CODE_INVALID_PARA);
7,516!
865
  }
866
  
867
  for (int32_t j = 0; j < numOfExpr; ++j) {
21,409✔
868
    const SExprInfo* pExpr1 = &pExprInfo[j];
13,618✔
869
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
13,618✔
870

871
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
13,618✔
872
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
13,581!
873
    int32_t functionId = pExpr1->pExpr->_function.functionId;
13,581✔
874

875
    // this is to handle the tbname
876
    if (fmIsScanPseudoColumnFunc(functionId)) {
13,581✔
877
      int32_t fType = pExpr1->pExpr->_function.functionType;
7,756✔
878
      if (fType == FUNCTION_TYPE_TBNAME) {
7,756!
879
        char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
7,812✔
880
        if (uidData == NULL) {
7,812✔
881
          STR_TO_VARSTR(buf, mr.me.name)
320✔
882
          char* tbname = taosStrdup(mr.me.name);
320!
883
          STREAM_CHECK_NULL_GOTO(tbname, terrno);
319!
884
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &tbname), terrno);
638!
885
        } else {
886
          char* tbname = taosArrayGetP(tagCache, j);
7,492✔
887
          STR_TO_VARSTR(buf, tbname)
7,524✔
888
        }
889
        code = colDataSetNItems(pColInfoData, currentRow, buf, numOfRows, numOfBlocks, false);
7,843✔
890
        // stInfo("set pseudo column tbname:%s currentRow:%d, numOfRows:%d, dstSlotId:%d, totalRows:%"PRId64" for uid:%" PRIu64 ", %p,%p", buf + VARSTR_HEADER_SIZE, 
891
        //   currentRow, numOfRows, dstSlotId, pBlock->info.rows, uid, pColInfoData, pColInfoData->pData);
892
        pColInfoData->info.colId = -1;
7,815✔
893
      }
894
    } else {  // these are tags
895
      char* data = NULL;
5,811✔
896
      const char* p = NULL;
5,811✔
897
      STagVal tagVal = {0};
5,811✔
898
      if (uidData == NULL) {
5,811✔
899
        tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
260✔
900
        p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
260✔
901

902
        if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
260!
903
          data = tTagValToData((const STagVal*)p, false);
260✔
904
        } else {
905
          data = (char*)p;
×
906
        }
907

908
        if (data == NULL) {
260!
909
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &data), terrno);
×
910
        } else {
911
          int32_t len = pColInfoData->info.bytes;
260✔
912
          if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
260!
913
            len = calcStrBytesByType(pColInfoData->info.type, (char*)data);
83✔
914
          }
915
          char* pData = taosMemoryCalloc(1, len);
260!
916
          STREAM_CHECK_NULL_GOTO(pData, terrno);
261!
917
          (void)memcpy(pData, data, len);
261✔
918
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &pData), terrno);
522!
919
        }
920
      } else {
921
        data = taosArrayGetP(tagCache, j);
5,551✔
922
      }
923

924
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
5,807!
925
      if (isNullVal) {
5,807!
926
        colDataSetNNULL(pColInfoData, currentRow, numOfRows);
×
927
      } else {
928
        for (uint32_t i = 0; i < numOfRows && !IS_VAR_DATA_TYPE(pColInfoData->info.type); i++){
11,234!
929
          colDataClearNull_f(pColInfoData->nullbitmap, currentRow + i);
5,427✔
930
        }
931
        code = colDataSetNItems(pColInfoData, currentRow, data, numOfRows, numOfBlocks, false);
5,807✔
932
        if (uidData == NULL && pColInfoData->info.type != TSDB_DATA_TYPE_JSON && IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
5,826!
933
          taosMemoryFree(data);
84!
934
        }
935
        STREAM_CHECK_RET_GOTO(code);
5,824!
936
      }
937
    }
938
  }
939

940
end:
7,791✔
941
  api->metaReaderFn.clearReader(&mr);
7,791✔
942
  return code;
7,801✔
943
}
944

945
int32_t getRowRange(SColData* pCol, STimeWindow* window, int32_t* rowStart, int32_t* rowEnd, int32_t* nRows) {
×
946
  int32_t code = 0;
×
947
  int32_t lino = 0;
×
948
  *nRows = 0;
×
949
  *rowStart = 0;
×
950
  *rowEnd = pCol->nVal;
×
951
  if (window != NULL) {
×
952
    SColVal colVal = {0};
×
953
    *rowStart = -1;
×
954
    *rowEnd = -1;
×
955
    for (int32_t k = 0; k < pCol->nVal; k++) {
×
956
      STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
957
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
958
      if (ts >= window->skey && *rowStart == -1) {
×
959
        *rowStart = k;
×
960
      }
961
      if (ts > window->ekey && *rowEnd == -1) {
×
962
        *rowEnd = k;
×
963
      }
964
    }
965
    STREAM_CHECK_CONDITION_GOTO(*rowStart == -1 || *rowStart == *rowEnd, TDB_CODE_SUCCESS);
×
966

967
    if (*rowStart != -1 && *rowEnd == -1) {
×
968
      *rowEnd = pCol->nVal;
×
969
    }
970
  }
971
  *nRows = *rowEnd - *rowStart;
×
972

973
end:
×
974
  return code;
×
975
}
976

977
static int32_t setColData(int64_t rows, int32_t rowStart, int32_t rowEnd, SColData* colData, SColumnInfoData* pColData) {
×
978
  int32_t code = 0;
×
979
  int32_t lino = 0;
×
980
  for (int32_t k = rowStart; k < rowEnd; k++) {
×
981
    SColVal colVal = {0};
×
982
    STREAM_CHECK_RET_GOTO(tColDataGetValue(colData, k, &colVal));
×
983
    STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, rows + k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
984
                                        !COL_VAL_IS_VALUE(&colVal)));
985
  }
986
  end:
×
987
  return code;
×
988
}
989

990
static int32_t scanSubmitTbData(SVnode* pVnode, SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, 
20,860✔
991
  STSchema** schemas, SSHashObj* ranges, SSHashObj* gidHash, SSTriggerWalNewRsp* rsp, int64_t ver) {
992
  int32_t code = 0;
20,860✔
993
  int32_t lino = 0;
20,860✔
994
  uint64_t id = 0;
20,860✔
995
  WalMetaResult walMeta = {0};
20,860✔
996
  void* pTask = sStreamReaderInfo->pTask;
20,860✔
997
  SSDataBlock * pBlock = (SSDataBlock*)rsp->dataBlock;
20,860✔
998

999
  if (tStartDecode(pCoder) < 0) {
20,860!
1000
    code = TSDB_CODE_INVALID_MSG;
×
1001
    TSDB_CHECK_CODE(code, lino, end);
×
1002
  }
1003

1004
  SSubmitTbData submitTbData = {0};
20,898✔
1005
  uint8_t       version = 0;
20,898✔
1006
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
20,851!
1007
    code = TSDB_CODE_INVALID_MSG;
×
1008
    TSDB_CHECK_CODE(code, lino, end);
×
1009
  }
1010
  version = (submitTbData.flags >> 8) & 0xff;
20,851✔
1011
  submitTbData.flags = submitTbData.flags & 0xff;
20,851✔
1012
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1013
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
20,851✔
1014
    if (tStartDecode(pCoder) < 0) {
106!
1015
      code = TSDB_CODE_INVALID_MSG;
×
1016
      TSDB_CHECK_CODE(code, lino, end);
×
1017
    }
1018
    tEndDecode(pCoder);
106✔
1019
  }
1020

1021
  // submit data
1022
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
20,823!
1023
    code = TSDB_CODE_INVALID_MSG;
×
1024
    TSDB_CHECK_CODE(code, lino, end);
×
1025
  }
1026
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
20,772!
1027
    code = TSDB_CODE_INVALID_MSG;
×
1028
    TSDB_CHECK_CODE(code, lino, end);
×
1029
  }
1030

1031
  STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &id, rsp->isCalc), TDB_CODE_SUCCESS);
20,772✔
1032

1033
  walMeta.id = id;
17,868✔
1034
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
17,868✔
1035

1036
  if (ranges != NULL){
17,868✔
1037
    void* timerange = tSimpleHashGet(ranges, &id, sizeof(id));
13,889✔
1038
    if (timerange == NULL) goto end;;
13,877!
1039
    int64_t* pRange = (int64_t*)timerange;
13,877✔
1040
    window.skey = pRange[0];
13,877✔
1041
    window.ekey = pRange[1];
13,877✔
1042
  }
1043
  
1044
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
17,921!
1045
    code = TSDB_CODE_INVALID_MSG;
×
1046
    TSDB_CHECK_CODE(code, lino, end);
×
1047
  }
1048

1049
  if (*schemas == NULL) {
17,921!
1050
    *schemas = metaGetTbTSchema(pVnode->pMeta, submitTbData.suid != 0 ? submitTbData.suid : submitTbData.uid, submitTbData.sver, 1);
17,921✔
1051
    STREAM_CHECK_NULL_GOTO(*schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
17,955!
1052
  }
1053

1054
  SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(sStreamReaderInfo->indexHash, &submitTbData.uid, LONG_BYTES);
17,955✔
1055
  STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
17,941!
1056
  int32_t blockStart = pSlice->currentRowIdx;
17,941✔
1057

1058
  int32_t numOfRows = 0;
17,941✔
1059
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
17,941!
1060
    uint64_t nColData = 0;
×
1061
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1062
      code = TSDB_CODE_INVALID_MSG;
×
1063
      TSDB_CHECK_CODE(code, lino, end);
×
1064
    }
1065

1066
    SColData colData = {0};
×
1067
    code = tDecodeColData(version, pCoder, &colData, false);
×
1068
    if (code) {
×
1069
      code = TSDB_CODE_INVALID_MSG;
×
1070
      TSDB_CHECK_CODE(code, lino, end);
×
1071
    }
1072

1073
    if (colData.flag != HAS_VALUE) {
×
1074
      code = TSDB_CODE_INVALID_MSG;
×
1075
      TSDB_CHECK_CODE(code, lino, end);
×
1076
    }
1077
    
1078
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
1079
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
1080

1081
    int32_t rowStart = 0;
×
1082
    int32_t rowEnd = 0;
×
1083
    STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, &numOfRows));
×
1084
    STREAM_CHECK_CONDITION_GOTO(numOfRows <= 0, TDB_CODE_SUCCESS);
×
1085

1086
    int32_t pos = pCoder->pos;
×
1087
    for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
1088
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
1089
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
1090
      if (pColData->info.colId <= -1) {
×
1091
        pColData->hasNull = true;
×
1092
        continue;
×
1093
      }
1094
      if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
1095
        STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colData, pColData));
×
1096
        continue;
×
1097
      }
1098

1099
      pCoder->pos = pos;
×
1100

1101
      int16_t colId = 0;
×
1102
      if (sStreamReaderInfo->isVtableStream){
×
1103
        int64_t id[2] = {submitTbData.suid, submitTbData.uid};
×
1104
        void *px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
×
1105
        STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
×
1106
        SSHashObj* uInfo = *(SSHashObj **)px;
×
1107
        STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
×
1108
        int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
×
1109
        if (tmp != NULL) {
×
1110
          colId = *tmp;
×
1111
        } else {
1112
          colId = -1;
×
1113
        }
1114
      } else {
1115
        colId = pColData->info.colId;
×
1116
      }
1117
      
1118
      uint64_t j = 1;
×
1119
      for (; j < nColData; j++) {
×
1120
        int16_t cid = 0;
×
1121
        int32_t posTmp = pCoder->pos;
×
1122
        pCoder->pos += INT_BYTES;
×
1123
        if ((code = tDecodeI16v(pCoder, &cid))) return code;
×
1124
        pCoder->pos = posTmp;
×
1125
        if (cid == colId) {
×
1126
          SColData colDataTmp = {0};
×
1127
          code = tDecodeColData(version, pCoder, &colDataTmp, false);
×
1128
          if (code) {
×
1129
            code = TSDB_CODE_INVALID_MSG;
×
1130
            TSDB_CHECK_CODE(code, lino, end);
×
1131
          }
1132
          STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colDataTmp, pColData));
×
1133
          break;
×
1134
        }
1135
        code = tDecodeColData(version, pCoder, &colData, true);
×
1136
        if (code) {
×
1137
          code = TSDB_CODE_INVALID_MSG;
×
1138
          TSDB_CHECK_CODE(code, lino, end);
×
1139
        }
1140
      }
1141
      if (j == nColData) {
×
1142
        colDataSetNNULL(pColData, blockStart, numOfRows);
×
1143
      }
1144
    }
1145
  } else {
1146
    uint64_t nRow = 0;
17,941✔
1147
    if (tDecodeU64v(pCoder, &nRow) < 0) {
17,928!
1148
      code = TSDB_CODE_INVALID_MSG;
×
1149
      TSDB_CHECK_CODE(code, lino, end);
×
1150
    }
1151
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
41,903✔
1152
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
24,036✔
1153
      pCoder->pos += pRow->len;
24,036✔
1154

1155
      if (iRow == 0){
24,036✔
1156
#ifndef NO_UNALIGNED_ACCESS
1157
        walMeta.skey = pRow->ts;
17,938✔
1158
#else
1159
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
1160
#endif
1161
      }
1162
      if (iRow == nRow - 1) {
24,036✔
1163
#ifndef NO_UNALIGNED_ACCESS
1164
        walMeta.ekey = pRow->ts;
17,942✔
1165
#else
1166
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
1167
#endif
1168
      }
1169

1170
      if (pRow->ts < window.skey || pRow->ts > window.ekey) {
24,036!
1171
        continue;
1,024✔
1172
      }
1173
     
1174
      for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
154,227✔
1175
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
130,778✔
1176
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
130,895!
1177
        if (pColData->info.colId <= -1) {
131,253✔
1178
          pColData->hasNull = true;
42,016✔
1179
          continue;
42,016✔
1180
        }
1181
        int16_t colId = 0;
89,237✔
1182
        if (sStreamReaderInfo->isVtableStream){
89,237✔
1183
          int64_t id[2] = {submitTbData.suid, submitTbData.uid};
30,724✔
1184
          void* px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
30,724✔
1185
          STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
30,716!
1186
          SSHashObj* uInfo = *(SSHashObj**)px;
30,716✔
1187
          STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
30,716!
1188
          int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
30,716✔
1189
          if (tmp != NULL) {
30,665✔
1190
            colId = *tmp;
28,129✔
1191
          } else {
1192
            colId = -1;
2,536✔
1193
          }
1194
          ST_TASK_TLOG("%s vtable colId:%d, i:%d, uid:%" PRId64, __func__, colId, i, submitTbData.uid);
30,665!
1195
        } else {
1196
          colId = pColData->info.colId;
58,513✔
1197
        }
1198
        
1199
        SColVal colVal = {0};
89,181✔
1200
        int32_t sourceIdx = 0;
89,181✔
1201
        while (1) {
1202
          if (sourceIdx >= (*schemas)->numOfCols) {
237,801✔
1203
            break;
33,282✔
1204
          }
1205
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, *schemas, sourceIdx, &colVal));
204,519!
1206
          if (colVal.cid == colId) {
204,487✔
1207
            break;
55,867✔
1208
          }
1209
          sourceIdx++;
148,620✔
1210
        }
1211
        if (colVal.cid == colId && COL_VAL_IS_VALUE(&colVal)) {
89,149✔
1212
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
53,030!
1213
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, blockStart+ numOfRows, (const char*)colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
66!
1214
            // ST_TASK_ILOG("%s vtable colId:%d, i:%d, colData:%p, data:%s, len:%d, rowIndex:%d, offset:%d, uid:%" PRId64, __func__, colId, i, pColData, 
1215
            //   (const char*)colVal.value.pData, colVal.value.nData, blockStart+ numOfRows, pColData->varmeta.offset[blockStart+ numOfRows], submitTbData.uid);
1216
          } else {
1217
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, blockStart + numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
52,964!
1218
          }
1219
        } else {
1220
          colDataSetNULL(pColData, blockStart + numOfRows);
36,119✔
1221
        }
1222
      }
1223
      
1224
      numOfRows++;
22,951✔
1225
    }
1226
  }
1227

1228
  if (numOfRows > 0) {
17,867!
1229
    if (!sStreamReaderInfo->isVtableStream) {
17,876✔
1230
      SStorageAPI  api = {0};
7,581✔
1231
      initStorageAPI(&api);
7,581✔
1232
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo, rsp->isCalc, &api, submitTbData.uid, pBlock, blockStart, numOfRows, 1));
7,610!
1233
    }
1234
    
1235
    SColumnInfoData* pColData = taosArrayGetLast(pBlock->pDataBlock);
17,900✔
1236
    STREAM_CHECK_NULL_GOTO(pColData, terrno);
17,897!
1237
    STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, blockStart, (const char*)&ver, numOfRows, 1, false));
17,897!
1238
  }
1239

1240
  ST_TASK_DLOG("%s process submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
17,921✔
1241
    ", uid:%" PRId64 ", ver:%d, row index:%d, rows:%d", __func__, window.skey, window.ekey, 
1242
    id, submitTbData.uid, submitTbData.sver, pSlice->currentRowIdx, numOfRows);
1243
  pSlice->currentRowIdx += numOfRows;
17,935✔
1244
  pBlock->info.rows += numOfRows;
17,935✔
1245
  
1246
  if (gidHash == NULL) goto end;
17,935✔
1247

1248
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
4,042✔
1249
  if (data != NULL) {
4,024!
1250
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
×
1251
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
×
1252
  } else {
1253
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
4,024!
1254
  }
1255

1256
end:
4,014✔
1257
  if (code != 0) {                                                             \
20,860!
1258
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); \
×
1259
  }
1260
  tEndDecode(pCoder);
20,860✔
1261
  return code;
20,776✔
1262
}
1263
static int32_t scanSubmitData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo,
20,847✔
1264
  void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp, int64_t ver) {
1265
  int32_t  code = 0;
20,847✔
1266
  int32_t  lino = 0;
20,847✔
1267
  STSchema* schemas = NULL;
20,847✔
1268
  SDecoder decoder = {0};
20,847✔
1269
  SSHashObj* gidHash = NULL;
20,847✔
1270
  void* pTask = sStreamReaderInfo->pTask;
20,847✔
1271

1272
  tDecoderInit(&decoder, data, len);
20,847✔
1273
  if (tStartDecode(&decoder) < 0) {
20,834!
1274
    code = TSDB_CODE_INVALID_MSG;
×
1275
    TSDB_CHECK_CODE(code, lino, end);
×
1276
  }
1277

1278
  uint64_t nSubmitTbData = 0;
20,877✔
1279
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
20,856!
1280
    code = TSDB_CODE_INVALID_MSG;
×
1281
    TSDB_CHECK_CODE(code, lino, end);
×
1282
  }
1283

1284
  if (rsp->metaBlock != NULL){
20,856✔
1285
    gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
6,951✔
1286
    STREAM_CHECK_NULL_GOTO(gidHash, terrno);
6,975!
1287
  }
1288

1289
  for (int32_t i = 0; i < nSubmitTbData; i++) {
41,645✔
1290
    STREAM_CHECK_RET_GOTO(scanSubmitTbData(pVnode, &decoder, sStreamReaderInfo, &schemas, ranges, gidHash, rsp, ver));
20,884!
1291
  }
1292

1293
  tEndDecode(&decoder);
20,761✔
1294

1295
  if (rsp->metaBlock != NULL){
20,704✔
1296
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
6,840!
1297
    int32_t iter = 0;
6,837✔
1298
    void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
6,837✔
1299
    while (px != NULL) {
10,953✔
1300
      WalMetaResult* pMeta = (WalMetaResult*)px;
4,011✔
1301
      STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
4,011!
1302
      ((SSDataBlock*)rsp->metaBlock)->info.rows++;
3,983✔
1303
      ST_TASK_DLOG("%s process meta data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
3,983✔
1304
            ", ver:%"PRId64, __func__, pMeta->skey, pMeta->ekey, pMeta->id, ver);
1305
      px = tSimpleHashIterate(gidHash, px, &iter);
3,983✔
1306
    }
1307
  }
1308
  
1309

1310
end:
13,864✔
1311
  taosMemoryFree(schemas);
20,806!
1312
  tSimpleHashCleanup(gidHash);
20,864✔
1313
  tDecoderClear(&decoder);
20,851✔
1314
  return code;
20,905✔
1315
}
1316

1317
static int32_t scanSubmitTbDataPre(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* ranges, 
22,968✔
1318
  uint64_t* gid, int64_t* uid, int32_t* numOfRows, bool isCalc) {
1319
  int32_t code = 0;
22,968✔
1320
  int32_t lino = 0;
22,968✔
1321
  void* pTask = sStreamReaderInfo->pTask;
22,968✔
1322

1323
  if (tStartDecode(pCoder) < 0) {
22,968!
1324
    code = TSDB_CODE_INVALID_MSG;
×
1325
    TSDB_CHECK_CODE(code, lino, end);
×
1326
  }
1327

1328
  SSubmitTbData submitTbData = {0};
23,046✔
1329
  uint8_t       version = 0;
23,046✔
1330
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
22,968!
1331
    code = TSDB_CODE_INVALID_MSG;
×
1332
    TSDB_CHECK_CODE(code, lino, end);
×
1333
  }
1334
  version = (submitTbData.flags >> 8) & 0xff;
22,968✔
1335
  submitTbData.flags = submitTbData.flags & 0xff;
22,968✔
1336

1337
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1338
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
22,968✔
1339
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
1,232!
1340
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
1,234!
1341
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
1,234!
1342
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq));
1,232!
1343
  }
1344

1345
  // submit data
1346
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
22,866!
1347
    code = TSDB_CODE_INVALID_MSG;
×
1348
    TSDB_CHECK_CODE(code, lino, end);
×
1349
  }
1350
  if (tDecodeI64(pCoder, uid) < 0) {
22,784!
1351
    code = TSDB_CODE_INVALID_MSG;
×
1352
    TSDB_CHECK_CODE(code, lino, end);
×
1353
  }
1354

1355
  STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, submitTbData.suid, *uid, gid, isCalc), TDB_CODE_SUCCESS);
22,784✔
1356

1357
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
17,752✔
1358

1359
  if (ranges != NULL){
17,752✔
1360
    void* timerange = tSimpleHashGet(ranges, gid, sizeof(*gid));
13,876✔
1361
    if (timerange == NULL) goto end;;
13,856!
1362
    int64_t* pRange = (int64_t*)timerange;
13,856✔
1363
    window.skey = pRange[0];
13,856✔
1364
    window.ekey = pRange[1];
13,856✔
1365
  }
1366
  
1367
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
17,846!
1368
    code = TSDB_CODE_INVALID_MSG;
×
1369
    TSDB_CHECK_CODE(code, lino, end);
×
1370
  }
1371

1372
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
17,846!
1373
    uint64_t nColData = 0;
×
1374
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1375
      code = TSDB_CODE_INVALID_MSG;
×
1376
      TSDB_CHECK_CODE(code, lino, end);
×
1377
    }
1378

1379
    SColData colData = {0};
×
1380
    code = tDecodeColData(version, pCoder, &colData, false);
×
1381
    if (code) {
×
1382
      code = TSDB_CODE_INVALID_MSG;
×
1383
      TSDB_CHECK_CODE(code, lino, end);
×
1384
    }
1385

1386
    if (colData.flag != HAS_VALUE) {
×
1387
      code = TSDB_CODE_INVALID_MSG;
×
1388
      TSDB_CHECK_CODE(code, lino, end);
×
1389
    }
1390
    int32_t rowStart = 0;
×
1391
    int32_t rowEnd = 0;
×
1392
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) {
×
1393
      STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, numOfRows));
×
1394
    } else {
1395
      (*numOfRows) = colData.nVal;
×
1396
    } 
1397
  } else {
1398
    uint64_t nRow = 0;
17,846✔
1399
    if (tDecodeU64v(pCoder, &nRow) < 0) {
17,830!
1400
      code = TSDB_CODE_INVALID_MSG;
×
1401
      TSDB_CHECK_CODE(code, lino, end);
×
1402
    }
1403

1404
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) { 
17,830!
1405
      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
33,804✔
1406
        SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
19,919✔
1407
        pCoder->pos += pRow->len;
19,919✔
1408
        if (pRow->ts < window.skey || pRow->ts > window.ekey) {
19,919!
1409
          continue;
1,024✔
1410
        }
1411
        (*numOfRows)++;
18,895✔
1412
      }
1413
    } else {
1414
      (*numOfRows) = nRow;
3,945✔
1415
    }
1416
  }
1417
  
1418
end:
23,086✔
1419
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
23,086✔
1420
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
22,929!
1421
  tEndDecode(pCoder);
22,933✔
1422
  return code;
22,771✔
1423
}
1424

1425
static int32_t scanSubmitDataPre(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
22,903✔
1426
  int32_t  code = 0;
22,903✔
1427
  int32_t  lino = 0;
22,903✔
1428
  SDecoder decoder = {0};
22,903✔
1429
  void* pTask = sStreamReaderInfo->pTask;
22,903✔
1430

1431
  tDecoderInit(&decoder, data, len);
22,903✔
1432
  if (tStartDecode(&decoder) < 0) {
22,861!
1433
    code = TSDB_CODE_INVALID_MSG;
×
1434
    TSDB_CHECK_CODE(code, lino, end);
×
1435
  }
1436

1437
  uint64_t nSubmitTbData = 0;
23,014✔
1438
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
22,977!
1439
    code = TSDB_CODE_INVALID_MSG;
×
1440
    TSDB_CHECK_CODE(code, lino, end);
×
1441
  }
1442

1443
  for (int32_t i = 0; i < nSubmitTbData; i++) {
45,948✔
1444
    uint64_t gid = -1;
22,967✔
1445
    int64_t  uid = 0;
22,967✔
1446
    int32_t numOfRows = 0;
22,967✔
1447
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataPre(&decoder, sStreamReaderInfo, ranges, &gid, &uid, &numOfRows, rsp->isCalc));
22,967!
1448
    if (numOfRows <= 0) {
22,747✔
1449
      continue;
5,184✔
1450
    }
1451
    rsp->totalRows += numOfRows;
17,563✔
1452

1453
    SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(sStreamReaderInfo->indexHash, &uid, LONG_BYTES);
17,563✔
1454
    if (pSlice != NULL) {
17,762✔
1455
      pSlice->numRows += numOfRows;
17,238✔
1456
      ST_TASK_DLOG("%s again uid:%" PRId64 ", gid:%" PRIu64 ", total numOfRows:%d", __func__, uid, gid, pSlice->numRows);
17,238✔
1457
      pSlice->gId = gid;
17,242✔
1458
    } else {
1459
      SStreamWalDataSlice tmp = {.gId=gid,.numRows=numOfRows,.currentRowIdx=0,.startRowIdx=0};
524✔
1460
      ST_TASK_DLOG("%s first uid:%" PRId64 ", gid:%" PRIu64 ", numOfRows:%d", __func__, uid, gid, tmp.numRows);
524✔
1461
      STREAM_CHECK_RET_GOTO(tSimpleHashPut(sStreamReaderInfo->indexHash, &uid, LONG_BYTES, &tmp, sizeof(tmp)));
524!
1462
    } 
1463
  }
1464

1465
  tEndDecode(&decoder);
22,981✔
1466

1467
end:
22,663✔
1468
  tDecoderClear(&decoder);
22,663✔
1469
  return code;
23,110✔
1470
}
1471

1472
static void resetIndexHash(SSHashObj* indexHash){
16,740✔
1473
  void*   pe = NULL;
16,740✔
1474
  int32_t iter = 0;
16,740✔
1475
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
39,187✔
1476
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
22,447✔
1477
    pInfo->startRowIdx = 0;
22,447✔
1478
    pInfo->currentRowIdx = 0;
22,447✔
1479
    pInfo->numRows = 0;
22,447✔
1480
    pInfo->gId = -1;
22,447✔
1481
  }
1482
}
16,697✔
1483

1484
static void buildIndexHash(SSHashObj* indexHash, void* pTask){
1,270✔
1485
  void*   pe = NULL;
1,270✔
1486
  int32_t iter = 0;
1,270✔
1487
  int32_t index = 0;
1,270✔
1488
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
4,897✔
1489
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
3,627✔
1490
    pInfo->startRowIdx = index;
3,627✔
1491
    pInfo->currentRowIdx = index;
3,627✔
1492
    index += pInfo->numRows;
3,627✔
1493
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
5,426!
1494
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1495
  }
1496
}
1,271✔
1497

1498
static void printIndexHash(SSHashObj* indexHash, void* pTask){
1,270✔
1499
  void*   pe = NULL;
1,270✔
1500
  int32_t iter = 0;
1,270✔
1501
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
4,898✔
1502
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
3,628✔
1503
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
5,427!
1504
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1505
  }
1506
}
1,268✔
1507

1508
static void filterIndexHash(SSHashObj* indexHash, SColumnInfoData* pRet){
26✔
1509
  void*   pe = NULL;
26✔
1510
  int32_t iter = 0;
26✔
1511
  int32_t index = 0;
26✔
1512
  int32_t pIndex = 0;
26✔
1513
  int8_t* pIndicator = (int8_t*)pRet->pData;
26✔
1514
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
80✔
1515
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
54✔
1516
    pInfo->startRowIdx = index;
54✔
1517
    int32_t size = pInfo->numRows;
54✔
1518
    for (int32_t i = 0; i < pInfo->numRows; i++) {
342✔
1519
      if (pIndicator && !pIndicator[pIndex++]) {
288!
1520
        size--;
106✔
1521
      }
1522
    }
1523
    pInfo->numRows = size;
54✔
1524
    index += pInfo->numRows;
54✔
1525
    stTrace("stream reader re build index hash uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
54!
1526
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1527
  }
1528
}
26✔
1529

1530
static int32_t prepareIndexMetaData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* resultRsp){
8,315✔
1531
  int32_t      code = 0;
8,315✔
1532
  int32_t      lino = 0;
8,315✔
1533
  void* pTask = sStreamReaderInfo->pTask;
8,315✔
1534

1535
  code = walReaderSeekVer(pWalReader, resultRsp->ver);
8,315✔
1536
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
8,343✔
1537
    if (resultRsp->ver < walGetFirstVer(pWalReader->pWal)) {
7,399!
1538
      resultRsp->ver = walGetFirstVer(pWalReader->pWal);
×
1539
    }
1540
    ST_TASK_DLOG("%s scan wal error:%s",  __func__, tstrerror(code));
7,396✔
1541
    code = TSDB_CODE_SUCCESS;
7,398✔
1542
    goto end;
7,398✔
1543
  }
1544
  STREAM_CHECK_RET_GOTO(code);
944!
1545

1546
  while (1) {
9,564✔
1547
    code = walNextValidMsg(pWalReader, true);
10,508✔
1548
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
10,377✔
1549
      ST_TASK_DLOG("%s scan wal error:%s", __func__, tstrerror(code));
920✔
1550
      code = TSDB_CODE_SUCCESS;
942✔
1551
      goto end;
942✔
1552
    }
1553
    STREAM_CHECK_RET_GOTO(code);
9,457!
1554
    resultRsp->ver = pWalReader->curVersion;
9,457✔
1555
    SWalCont* wCont = &pWalReader->pHead->head;
9,457✔
1556
    resultRsp->verTime = wCont->ingestTs;
9,457✔
1557
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
9,457✔
1558
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
9,457✔
1559
    int64_t ver = wCont->version;
9,457✔
1560
    ST_TASK_DLOG("%s scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d", __func__,
9,457✔
1561
      ver, wCont->msgType, sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
1562
    if (wCont->msgType == TDMT_VND_SUBMIT) {
9,425✔
1563
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
9,038✔
1564
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
9,038✔
1565
      STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, data, len, NULL, resultRsp));
9,038!
1566
    } else if (wCont->msgType == TDMT_VND_ALTER_TABLE && resultRsp->totalRows > 0) {
387✔
1567
      resultRsp->ver--;
5✔
1568
      break;
5✔
1569
    } else {
1570
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, resultRsp, ver));
382!
1571
    }
1572

1573
    ST_TASK_DLOG("%s scan wal next ver:%" PRId64 ", totalRows:%d", __func__, resultRsp->ver, resultRsp->totalRows);
9,559✔
1574
    if (resultRsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
9,564!
1575
      break;
×
1576
    }
1577
  }
1578
  
1579
end:
8,345✔
1580
  STREAM_PRINT_LOG_END(code, lino);
8,345!
1581
  return code;
8,341✔
1582
}
1583

1584
static int32_t prepareIndexData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, 
8,398✔
1585
  SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp){
1586
  int32_t      code = 0;
8,398✔
1587
  int32_t      lino = 0;
8,398✔
1588

1589
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
22,291✔
1590
    int64_t *ver = taosArrayGet(versions, i);
13,884✔
1591
    if (ver == NULL) continue;
13,890!
1592

1593
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
13,890!
1594
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
13,901!
1595
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
1596
      continue;
×
1597
    }
1598
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
13,901!
1599

1600
    SWalCont* wCont = &pWalReader->pHead->head;
13,891✔
1601
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
13,891✔
1602
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
13,891✔
1603

1604
    STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, pBody, bodyLen, ranges, rsp));
13,891!
1605
  }
1606
  
1607
end:
8,401✔
1608
  return code;
8,401✔
1609
}
1610

1611
static int32_t filterData(SSTriggerWalNewRsp* resultRsp, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,271✔
1612
  int32_t      code = 0;
1,271✔
1613
  int32_t       lino = 0;
1,271✔
1614
  SColumnInfoData* pRet = NULL;
1,271✔
1615

1616
  int64_t totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
1,271✔
1617
  STREAM_CHECK_RET_GOTO(qStreamFilter(((SSDataBlock*)resultRsp->dataBlock), sStreamReaderInfo->pFilterInfo, &pRet));
1,271!
1618

1619
  if (((SSDataBlock*)resultRsp->dataBlock)->info.rows < totalRows) {
1,270✔
1620
    filterIndexHash(sStreamReaderInfo->indexHash, pRet);
26✔
1621
  }
1622

1623
end:
1,244✔
1624
  colDataDestroy(pRet);
1,270✔
1625
  taosMemoryFree(pRet);
1,271!
1626
  return code;
1,271✔
1627
}
1628

1629
static int32_t processWalVerMetaDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
8,371✔
1630
                                    SSTriggerWalNewRsp* resultRsp) {
1631
  int32_t      code = 0;
8,371✔
1632
  int32_t      lino = 0;
8,371✔
1633
  void* pTask = sStreamReaderInfo->pTask;
8,371✔
1634
                                        
1635
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
8,371✔
1636
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
8,350!
1637
  resetIndexHash(sStreamReaderInfo->indexHash);
8,350✔
1638
  blockDataEmpty(resultRsp->dataBlock);
8,338✔
1639
  blockDataEmpty(resultRsp->metaBlock);
8,315✔
1640
  int64_t lastVer = resultRsp->ver;                                      
8,310✔
1641
  STREAM_CHECK_RET_GOTO(prepareIndexMetaData(pWalReader, sStreamReaderInfo, resultRsp));
8,310!
1642
  STREAM_CHECK_CONDITION_GOTO(resultRsp->totalRows == 0, TDB_CODE_SUCCESS);
8,342✔
1643

1644
  buildIndexHash(sStreamReaderInfo->indexHash, pTask);
313✔
1645
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(((SSDataBlock*)resultRsp->dataBlock), resultRsp->totalRows));
314!
1646
  while(lastVer < resultRsp->ver) {
7,596✔
1647
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, lastVer++));
7,281!
1648
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
7,265✔
1649
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
289!
1650
      continue;
289✔
1651
    }
1652
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
6,976!
1653
    SWalCont* wCont = &pWalReader->pHead->head;
6,964✔
1654
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
6,964✔
1655
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
6,964✔
1656

1657
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, NULL, resultRsp, wCont->version));
6,964!
1658
  }
1659

1660
  int32_t metaRows = resultRsp->totalRows - ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
315✔
1661
  STREAM_CHECK_RET_GOTO(filterData(resultRsp, sStreamReaderInfo));
315!
1662
  resultRsp->totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows + metaRows;
313✔
1663

1664
end:
8,342✔
1665
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
8,342✔
1666
          resultRsp->totalRows, resultRsp->ver, walGetAppliedVer(pWalReader->pWal));
1667
  walCloseReader(pWalReader);
8,342✔
1668
  return code;
8,380✔
1669
}
1670

1671
static int32_t processWalVerDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
8,401✔
1672
                                    SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
1673
  int32_t      code = 0;
8,401✔
1674
  int32_t      lino = 0;
8,401✔
1675

1676
  void* pTask = sStreamReaderInfo->pTask;
8,401✔
1677
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
8,401✔
1678
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
8,402!
1679
  
1680
  if (taosArrayGetSize(versions) > 0) {
8,402✔
1681
    rsp->ver = *(int64_t*)taosArrayGetLast(versions);
958✔
1682
  }
1683
  
1684
  resetIndexHash(sStreamReaderInfo->indexHash);
8,399✔
1685
  STREAM_CHECK_RET_GOTO(prepareIndexData(pWalReader, sStreamReaderInfo, versions, ranges, rsp));
8,401!
1686
  STREAM_CHECK_CONDITION_GOTO(rsp->totalRows == 0, TDB_CODE_SUCCESS);
8,400✔
1687

1688
  buildIndexHash(sStreamReaderInfo->indexHash, pTask);
957✔
1689

1690
  blockDataEmpty(rsp->dataBlock);
957✔
1691
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dataBlock, rsp->totalRows));
958!
1692

1693
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
14,861✔
1694
    int64_t *ver = taosArrayGet(versions, i);
13,900✔
1695
    if (ver == NULL) continue;
13,897!
1696

1697
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
13,897!
1698
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
13,905!
1699
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
1700
      continue;
×
1701
    }
1702
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
13,905!
1703
    SWalCont* wCont = &pWalReader->pHead->head;
13,906✔
1704
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
13,906✔
1705
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
13,906✔
1706

1707
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, ranges, rsp, wCont->version));
13,906!
1708
  }
1709
  // printDataBlock(rsp->dataBlock, __func__, "processWalVerDataNew");
1710
  STREAM_CHECK_RET_GOTO(filterData(rsp, sStreamReaderInfo));
957!
1711
  rsp->totalRows = ((SSDataBlock*)rsp->dataBlock)->info.rows;
957✔
1712

1713
end:
8,400✔
1714
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
8,400✔
1715
            rsp->totalRows, rsp->ver, walGetAppliedVer(pWalReader->pWal));
1716
  walCloseReader(pWalReader);
8,399✔
1717
  return code;
8,407✔
1718
}
1719

1720
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
837✔
1721
  int32_t code = 0;
837✔
1722
  int32_t lino = 0;
837✔
1723
  SMetaReader metaReader = {0};
837✔
1724
  SStorageAPI api = {0};
837✔
1725
  initStorageAPI(&api);
837✔
1726
  *schemas = taosArrayInit(8, sizeof(SSchema));
837✔
1727
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
837!
1728
  
1729
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
837✔
1730
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
837✔
1731

1732
  SSchemaWrapper* sSchemaWrapper = NULL;
835✔
1733
  if (metaReader.me.type == TD_CHILD_TABLE) {
835!
1734
    int64_t suid = metaReader.me.ctbEntry.suid;
835✔
1735
    tDecoderClear(&metaReader.coder);
835✔
1736
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
835!
1737
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
835✔
1738
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
×
1739
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
×
1740
  } else {
1741
    qError("invalid table type:%d", metaReader.me.type);
×
1742
  }
1743

1744
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
4,862✔
1745
    SSchema* s = sSchemaWrapper->pSchema + j;
4,027✔
1746
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
8,054!
1747
  }
1748

1749
end:
835✔
1750
  api.metaReaderFn.clearReader(&metaReader);
837✔
1751
  STREAM_PRINT_LOG_END(code, lino);
837!
1752
  if (code != 0)  {
837✔
1753
    taosArrayDestroy(*schemas);
2✔
1754
    *schemas = NULL;
2✔
1755
  }
1756
  return code;
837✔
1757
}
1758

1759
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
835✔
1760
  int32_t code = 0;
835✔
1761
  int32_t lino = 0;
835✔
1762
  size_t  schemaLen = taosArrayGetSize(schemas);
835✔
1763
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
835!
1764
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
3,194✔
1765
    col_id_t* id = taosArrayGet(cols, i);
2,359✔
1766
    STREAM_CHECK_NULL_GOTO(id, terrno);
2,359!
1767
    for (size_t i = 0; i < schemaLen; i++) {
6,054!
1768
      SSchema* s = taosArrayGet(schemas, i);
6,054✔
1769
      STREAM_CHECK_NULL_GOTO(s, terrno);
6,054!
1770
      if (*id == s->colId) {
6,054✔
1771
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
2,359!
1772
        break;
2,359✔
1773
      }
1774
    }
1775
  }
1776
  taosArrayPopFrontBatch(schemas, schemaLen);
835✔
1777

1778
end:
835✔
1779
  return code;
835✔
1780
}
1781

1782
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
×
1783
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
1784
  int32_t      code = 0;
×
1785
  int32_t      lino = 0;
×
1786
  SArray*      schemas = NULL;
×
1787

1788
  SSDataBlock* pBlock2 = NULL;
×
1789

1790
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
×
1791
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
×
1792
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
×
1793

1794
  pBlock2->info.id.uid = uid;
×
1795

1796
  // STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock2, ver, uid, window));
1797
  //printDataBlock(pBlock2, __func__, "");
1798

1799
  *pBlock = pBlock2;
×
1800
  pBlock2 = NULL;
×
1801

1802
end:
×
1803
  STREAM_PRINT_LOG_END(code, lino);
×
1804
  blockDataDestroy(pBlock2);
×
1805
  taosArrayDestroy(schemas);
×
1806
  return code;
×
1807
}
1808

1809
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
×
1810
                                    STargetNode* pTargetNodeTs) {
1811
  int32_t code = 0;
×
1812
  int32_t lino = 0;
×
1813

1814
  SColumnNode*         pCol = NULL;
×
1815
  SColumnNode*         pCol1 = NULL;
×
1816
  SValueNode*          pVal = NULL;
×
1817
  SValueNode*          pVal1 = NULL;
×
1818
  SOperatorNode*       op = NULL;
×
1819
  SOperatorNode*       op1 = NULL;
×
1820
  SLogicConditionNode* cond = NULL;
×
1821

1822
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
×
1823
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
×
1824
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
×
1825
  pCol->node.resType.bytes = LONG_BYTES;
×
1826
  pCol->slotId = pTargetNodeTs->slotId;
×
1827
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
×
1828

1829
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
×
1830

1831
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
×
1832
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
×
1833
  pVal->node.resType.bytes = LONG_BYTES;
×
1834
  pVal->datum.i = start;
×
1835
  pVal->typeData = start;
×
1836

1837
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
×
1838
  pVal1->datum.i = end;
×
1839
  pVal1->typeData = end;
×
1840

1841
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
×
1842
  op->opType = OP_TYPE_GREATER_EQUAL;
×
1843
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1844
  op->node.resType.bytes = CHAR_BYTES;
×
1845
  op->pLeft = (SNode*)pCol;
×
1846
  op->pRight = (SNode*)pVal;
×
1847
  pCol = NULL;
×
1848
  pVal = NULL;
×
1849

1850
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
×
1851
  op1->opType = OP_TYPE_LOWER_EQUAL;
×
1852
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1853
  op1->node.resType.bytes = CHAR_BYTES;
×
1854
  op1->pLeft = (SNode*)pCol1;
×
1855
  op1->pRight = (SNode*)pVal1;
×
1856
  pCol1 = NULL;
×
1857
  pVal1 = NULL;
×
1858

1859
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
×
1860
  cond->condType = LOGIC_COND_TYPE_AND;
×
1861
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1862
  cond->node.resType.bytes = CHAR_BYTES;
×
1863
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
×
1864
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
×
1865
  op = NULL;
×
1866
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
×
1867
  op1 = NULL;
×
1868

1869
  *pCond = cond;
×
1870

1871
end:
×
1872
  if (code != 0) {
×
1873
    nodesDestroyNode((SNode*)pCol);
×
1874
    nodesDestroyNode((SNode*)pCol1);
×
1875
    nodesDestroyNode((SNode*)pVal);
×
1876
    nodesDestroyNode((SNode*)pVal1);
×
1877
    nodesDestroyNode((SNode*)op);
×
1878
    nodesDestroyNode((SNode*)op1);
×
1879
    nodesDestroyNode((SNode*)cond);
×
1880
  }
1881
  STREAM_PRINT_LOG_END(code, lino);
×
1882

1883
  return code;
×
1884
}
1885

1886
/*
1887
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
1888
  int32_t              code = 0;
1889
  int32_t              lino = 0;
1890
  SLogicConditionNode* pAndCondition = NULL;
1891
  SLogicConditionNode* cond = NULL;
1892

1893
  if (pTargetNodeTs == NULL) {
1894
    vError("stream reader %s no ts column", __func__);
1895
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
1896
  }
1897
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
1898
  cond->condType = LOGIC_COND_TYPE_OR;
1899
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
1900
  cond->node.resType.bytes = CHAR_BYTES;
1901
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
1902

1903
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
1904
    data->curIdx = i;
1905

1906
    SReadHandle handle = {0};
1907
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
1908
    if (!handle.winRangeValid) {
1909
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
1910
              handle.winRange.ekey);
1911
      continue;
1912
    }
1913
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
1914
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
1915
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
1916
    pAndCondition = NULL;
1917
  }
1918

1919
  *pCond = cond;
1920

1921
end:
1922
  if (code != 0) {
1923
    nodesDestroyNode((SNode*)pAndCondition);
1924
    nodesDestroyNode((SNode*)cond);
1925
  }
1926
  STREAM_PRINT_LOG_END(code, lino);
1927

1928
  return code;
1929
}
1930
*/
1931

1932
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
1,598✔
1933
                                    STimeRangeNode* node, SReadHandle* handle) {
1934
  int32_t code = 0;
1,598✔
1935
  int32_t lino = 0;
1,598✔
1936
  SArray* funcVals = NULL;
1,598✔
1937
  if (req->pStRtFuncInfo->withExternalWindow) {
1,598✔
1938
/*
1939
    nodesDestroyNode(sStreamReaderCalcInfo->tsConditions);
1940
    filterFreeInfo(sStreamReaderCalcInfo->pFilterInfo);
1941
    sStreamReaderCalcInfo->pFilterInfo = NULL;
1942

1943
    STREAM_CHECK_RET_GOTO(createExternalConditions(req->pStRtFuncInfo,
1944
                                                   (SLogicConditionNode**)&sStreamReaderCalcInfo->tsConditions,
1945
                                                   sStreamReaderCalcInfo->pTargetNodeTs, node));
1946

1947
    STREAM_CHECK_RET_GOTO(filterInitFromNode((SNode*)sStreamReaderCalcInfo->tsConditions,
1948
                                             (SFilterInfo**)&sStreamReaderCalcInfo->pFilterInfo,
1949
                                             FLT_OPTION_NO_REWRITE | FLT_OPTION_SCALAR_MODE, NULL));
1950
*/                                             
1951
    sStreamReaderCalcInfo->tmpRtFuncInfo.curIdx = 0;
341✔
1952
    sStreamReaderCalcInfo->tmpRtFuncInfo.triggerType = req->pStRtFuncInfo->triggerType;
341✔
1953
    
1954
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
341✔
1955
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
341✔
1956
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
341!
1957
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
341!
1958

1959
    if (!node->needCalc) {
341✔
1960
      handle->winRange.skey = pFirst->wstart;
239✔
1961
      handle->winRange.ekey = pLast->wend;
239✔
1962
      handle->winRangeValid = true;
239✔
1963
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
239✔
1964
        handle->winRange.ekey--;
134✔
1965
      }
1966
    } else {
1967
      SSTriggerCalcParam* pTmp = taosArrayGet(sStreamReaderCalcInfo->tmpRtFuncInfo.pStreamPesudoFuncVals, 0);
102✔
1968
      memcpy(pTmp, pFirst, sizeof(*pTmp));
102✔
1969

1970
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, &handle->winRange, &handle->winRangeValid, 1));
102!
1971
      if (handle->winRangeValid) {
102!
1972
        int64_t skey = handle->winRange.skey;
102✔
1973

1974
        memcpy(pTmp, pLast, sizeof(*pTmp));
102✔
1975
        STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, &handle->winRange, &handle->winRangeValid, 2));
102!
1976

1977
        if (handle->winRangeValid) {
102!
1978
          handle->winRange.skey = skey;
102✔
1979
        }
1980
      }
1981
      handle->winRange.ekey--;
102✔
1982
    }
1983
  } else {
1984
    if (!node->needCalc) {
1,257✔
1985
      SSTriggerCalcParam* pCurr = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, req->pStRtFuncInfo->curIdx);
777✔
1986
      handle->winRange.skey = pCurr->wstart;
777✔
1987
      handle->winRange.ekey = pCurr->wend;
777✔
1988
      handle->winRangeValid = true;
777✔
1989
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
777✔
1990
        handle->winRange.ekey--;
765✔
1991
      }
1992
    } else {
1993
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, req->pStRtFuncInfo, &handle->winRange, &handle->winRangeValid, 3));
480!
1994
      handle->winRange.ekey--;
480✔
1995
    }
1996
  }
1997

1998
  stDebug("%s withExternalWindow is %d, skey:%" PRId64 ", ekey:%" PRId64 ", validRange:%d", 
1,598✔
1999
      __func__, req->pStRtFuncInfo->withExternalWindow, handle->winRange.skey, handle->winRange.ekey, handle->winRangeValid);
2000

2001
end:
32✔
2002
  taosArrayDestroy(funcVals);
1,598✔
2003
  return code;
1,598✔
2004
}
2005

2006
static int32_t processTs(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
1,037✔
2007
                                  SStreamReaderTaskInner* pTaskInner) {
2008
  int32_t code = 0;
1,037✔
2009
  int32_t lino = 0;
1,037✔
2010

2011
  void* pTask = sStreamReaderInfo->pTask;
1,037✔
2012
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTaskInner->pTableList), sizeof(STsInfo));
1,037✔
2013
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
1,037!
2014
  while (true) {
2,518✔
2015
    bool hasNext = false;
3,555✔
2016
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
3,555!
2017
    if (hasNext) {
3,554✔
2018
      pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
1,008✔
2019
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
1,007✔
2020
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
1,007!
2021
      if (pTaskInner->options.order == TSDB_ORDER_ASC) {
1,007✔
2022
        tsInfo->ts = pTaskInner->pResBlock->info.window.skey;
618✔
2023
      } else {
2024
        tsInfo->ts = pTaskInner->pResBlock->info.window.ekey;
389✔
2025
      }
2026
      tsInfo->gId = (sStreamReaderInfo->groupByTbname || sStreamReaderInfo->tableType != TSDB_SUPER_TABLE) ? 
133✔
2027
                    pTaskInner->pResBlock->info.id.uid : qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
1,140✔
2028
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
1,007✔
2029
              tsInfo->gId, tsRsp->ver);
2030
    }
2031
    
2032
    pTaskInner->currentGroupIndex++;
3,554✔
2033
    if (pTaskInner->currentGroupIndex >= qStreamGetTableListGroupNum(pTaskInner->pTableList) || pTaskInner->options.gid != 0) {
3,554✔
2034
      break;
2035
    }
2036
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTaskInner));
2,518!
2037
  }
2038

2039
end:
1,033✔
2040
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,033!
2041
  return code;
1,036✔
2042
}
2043

2044
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
196✔
2045
  int32_t code = 0;
196✔
2046
  int32_t lino = 0;
196✔
2047
  void*   buf = NULL;
196✔
2048
  size_t  size = 0;
196✔
2049
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
196!
2050
  void* pTask = sStreamReaderInfo->pTask;
196✔
2051

2052
  ST_TASK_DLOG("vgId:%d %s start, trigger hash size:%d, calc hash size:%d", TD_VID(pVnode), __func__,
196✔
2053
                tSimpleHashGetSize(req->setTableReq.uidInfoTrigger), tSimpleHashGetSize(req->setTableReq.uidInfoCalc));
2054

2055
  TSWAP(sStreamReaderInfo->uidHashTrigger, req->setTableReq.uidInfoTrigger);
196✔
2056
  TSWAP(sStreamReaderInfo->uidHashCalc, req->setTableReq.uidInfoCalc);
196✔
2057
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashTrigger, TSDB_CODE_INVALID_PARA);
196!
2058
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashCalc, TSDB_CODE_INVALID_PARA);
196!
2059

2060
  sStreamReaderInfo->isVtableStream = true;
196✔
2061
  sStreamReaderInfo->groupByTbname = true;
196✔
2062
end:
196✔
2063
  STREAM_PRINT_LOG_END_WITHID(code, lino);
196!
2064
  SRpcMsg rsp = {
196✔
2065
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2066
  tmsgSendRsp(&rsp);
196✔
2067
  return code;
196✔
2068
}
2069

2070
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
554✔
2071
  int32_t                 code = 0;
554✔
2072
  int32_t                 lino = 0;
554✔
2073
  SStreamReaderTaskInner* pTaskInner = NULL;
554✔
2074
  SStreamTsResponse       lastTsRsp = {0};
554✔
2075
  void*                   buf = NULL;
554✔
2076
  size_t                  size = 0;
554✔
2077

2078
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
554!
2079
  void* pTask = sStreamReaderInfo->pTask;
554✔
2080

2081
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
554✔
2082

2083
  BUILD_OPTION(options, sStreamReaderInfo, -1, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, sStreamReaderInfo->tsSchemas, true,
554✔
2084
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, true, sStreamReaderInfo->uidHashTrigger);
2085
  SStorageAPI api = {0};
554✔
2086
  initStorageAPI(&api);
554✔
2087
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
554!
2088

2089
  lastTsRsp.ver = pVnode->state.applied + 1;
554✔
2090

2091
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
554!
2092
  ST_TASK_DLOG("vgId:%d %s get result, ver:%" PRId64, TD_VID(pVnode), __func__, lastTsRsp.ver);
554✔
2093
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
554!
2094
  if (stDebugFlag & DEBUG_DEBUG) {
554✔
2095
    int32_t nInfo = taosArrayGetSize(lastTsRsp.tsInfo);
436✔
2096
    for (int32_t i = 0; i < nInfo; i++) {
785✔
2097
      STsInfo* tsInfo = TARRAY_GET_ELEM(lastTsRsp.tsInfo, i);
349✔
2098
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64, TD_VID(pVnode), __func__, tsInfo->ts, tsInfo->gId);
349!
2099
    }
2100
  }
2101

2102
end:
554✔
2103
  STREAM_PRINT_LOG_END_WITHID(code, lino);
554!
2104
  SRpcMsg rsp = {
554✔
2105
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2106
  tmsgSendRsp(&rsp);
554✔
2107
  taosArrayDestroy(lastTsRsp.tsInfo);
554✔
2108
  releaseStreamTask(&pTaskInner);
554✔
2109
  return code;
553✔
2110
}
2111

2112
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
495✔
2113
  int32_t                 code = 0;
495✔
2114
  int32_t                 lino = 0;
495✔
2115
  SStreamReaderTaskInner* pTaskInner = NULL;
495✔
2116
  SStreamTsResponse       firstTsRsp = {0};
495✔
2117
  void*                   buf = NULL;
495✔
2118
  size_t                  size = 0;
495✔
2119

2120
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
495!
2121
  void* pTask = sStreamReaderInfo->pTask;
495✔
2122
  ST_TASK_DLOG("vgId:%d %s start, startTime:%"PRId64" ver:%"PRId64" gid:%"PRId64, TD_VID(pVnode), __func__, req->firstTsReq.startTime, req->firstTsReq.ver, req->firstTsReq.gid);
495✔
2123
  BUILD_OPTION(options, sStreamReaderInfo, req->firstTsReq.ver, TSDB_ORDER_ASC, req->firstTsReq.startTime, INT64_MAX, sStreamReaderInfo->tsSchemas, true,
495✔
2124
               STREAM_SCAN_GROUP_ONE_BY_ONE, req->firstTsReq.gid, true, sStreamReaderInfo->uidHashTrigger);
2125
  SStorageAPI api = {0};
495✔
2126
  initStorageAPI(&api);
495✔
2127
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
495✔
2128
  
2129
  firstTsRsp.ver = pVnode->state.applied;
483✔
2130
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
483!
2131

2132
  ST_TASK_DLOG("vgId:%d %s get result size:%"PRIzu", ver:%"PRId64, TD_VID(pVnode), __func__, taosArrayGetSize(firstTsRsp.tsInfo), firstTsRsp.ver);
482✔
2133
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
482!
2134
  if (stDebugFlag & DEBUG_DEBUG) {
482✔
2135
    int32_t nInfo = taosArrayGetSize(firstTsRsp.tsInfo);
356✔
2136
    for (int32_t i = 0; i < nInfo; i++) {
814✔
2137
      STsInfo* tsInfo = TARRAY_GET_ELEM(firstTsRsp.tsInfo, i);
458✔
2138
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64, TD_VID(pVnode), __func__, tsInfo->ts, tsInfo->gId);
458!
2139
    }
2140
  }
2141

2142
end:
482✔
2143
  STREAM_PRINT_LOG_END_WITHID(code, lino);
494!
2144
  SRpcMsg rsp = {
494✔
2145
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2146
  tmsgSendRsp(&rsp);
494✔
2147
  taosArrayDestroy(firstTsRsp.tsInfo);
495✔
2148
  releaseStreamTask(&pTaskInner);
494✔
2149
  return code;
494✔
2150
}
2151

2152
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
972✔
2153
  int32_t code = 0;
972✔
2154
  int32_t lino = 0;
972✔
2155
  void*   buf = NULL;
972✔
2156
  size_t  size = 0;
972✔
2157

2158
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
972!
2159
  void* pTask = sStreamReaderInfo->pTask;
972✔
2160
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
972✔
2161

2162
  SStreamReaderTaskInner* pTaskInner = NULL;
972✔
2163
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
972✔
2164

2165
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
972!
2166
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbMetaReq.ver, req->tsdbMetaReq.order, req->tsdbMetaReq.startTime, req->tsdbMetaReq.endTime, sStreamReaderInfo->tsSchemas, true, 
972✔
2167
      (req->tsdbMetaReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), req->tsdbMetaReq.gid, true, sStreamReaderInfo->uidHashTrigger);
2168
    SStorageAPI api = {0};
972✔
2169
    initStorageAPI(&api);
972✔
2170
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
972✔
2171
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
960!
2172
    
2173
    STREAM_CHECK_RET_GOTO(createBlockForTsdbMeta(&pTaskInner->pResBlockDst, sStreamReaderInfo->isVtableStream));
960!
2174
  } else {
2175
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2176
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2177
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2178
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2179
  }
2180

2181
  blockDataCleanup(pTaskInner->pResBlockDst);
960✔
2182
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
960!
2183
  bool hasNext = true;
960✔
2184
  while (true) {
412✔
2185
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
1,372!
2186
    if (!hasNext) {
1,372✔
2187
      break;
960✔
2188
    }
2189
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
412✔
2190
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
412✔
2191

2192
    int32_t index = 0;
412✔
2193
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
412!
2194
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
412!
2195
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
412!
2196
    if (!sStreamReaderInfo->isVtableStream) {
412✔
2197
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
76!
2198
    }
2199
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
412!
2200

2201
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
412✔
2202
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2203
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2204
            pTaskInner->pResBlockDst->info.rows++;
412✔
2205
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
412!
2206
      break;
×
2207
    }
2208
  }
2209

2210
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
960✔
2211
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
960!
2212
  printDataBlock(pTaskInner->pResBlockDst, __func__, "meta", ((SStreamTask *)sStreamReaderInfo->pTask)->streamId);
960✔
2213
  if (!hasNext) {
960!
2214
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
960!
2215
  }
2216

2217
end:
960✔
2218
  STREAM_PRINT_LOG_END_WITHID(code, lino);
972!
2219
  SRpcMsg rsp = {
972✔
2220
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2221
  tmsgSendRsp(&rsp);
972✔
2222
  return code;
972✔
2223
}
2224

2225
static int32_t vnodeProcessStreamTsdbTsDataReqNonVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
43✔
2226
  int32_t                 code = 0;
43✔
2227
  int32_t                 lino = 0;
43✔
2228
  SStreamReaderTaskInner* pTaskInner = NULL;
43✔
2229
  void*                   buf = NULL;
43✔
2230
  size_t                  size = 0;
43✔
2231
  SSDataBlock*            pBlockRes = NULL;
43✔
2232

2233
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
43!
2234
  void* pTask = sStreamReaderInfo->pTask;
43✔
2235
  ST_TASK_DLOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
43!
2236
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2237
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2238

2239
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
43✔
2240
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
2241
  options.uid = req->tsdbTsDataReq.uid;
43✔
2242
  SStorageAPI api = {0};
43✔
2243
  initStorageAPI(&api);
43✔
2244
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
43!
2245
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
43!
2246
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
43!
2247

2248
  while (1) {
43✔
2249
    bool hasNext = false;
86✔
2250
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
86!
2251
    if (!hasNext) {
86✔
2252
      break;
43✔
2253
    }
2254
    if (!sStreamReaderInfo->isVtableStream){
43!
2255
      pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
43✔
2256
    }
2257

2258
    SSDataBlock* pBlock = NULL;
43✔
2259
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
43!
2260
    if (pBlock != NULL && pBlock->info.rows > 0) {
43!
2261
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo, false, &api, pBlock->info.id.uid, pBlock,
43!
2262
          0, pBlock->info.rows, 1));
2263
    }
2264
    
2265
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
43!
2266
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
43!
2267
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
43!
2268
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2269
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2270
  }
2271

2272
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
43✔
2273

2274
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
43!
2275
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
43!
2276

2277
end:
43✔
2278
  STREAM_PRINT_LOG_END_WITHID(code, lino);
43!
2279
  SRpcMsg rsp = {
43✔
2280
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2281
  tmsgSendRsp(&rsp);
43✔
2282
  blockDataDestroy(pBlockRes);
43✔
2283

2284
  releaseStreamTask(&pTaskInner);
43✔
2285
  return code;
43✔
2286
}
2287

2288
static int32_t vnodeProcessStreamTsdbTsDataReqVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1✔
2289
  int32_t                 code = 0;
1✔
2290
  int32_t                 lino = 0;
1✔
2291
  SStreamReaderTaskInner* pTaskInner = NULL;
1✔
2292
  void*                   buf = NULL;
1✔
2293
  size_t                  size = 0;
1✔
2294
  SSDataBlock*            pBlockRes = NULL;
1✔
2295

2296
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
1!
2297
  void* pTask = sStreamReaderInfo->pTask;
1✔
2298
  ST_TASK_ELOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
1!
2299
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2300
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2301

2302
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
1✔
2303
               sStreamReaderInfo->tsSchemas, true, STREAM_SCAN_ALL, 0, true, NULL);
2304
  options.suid = req->tsdbTsDataReq.suid;
1✔
2305
  options.uid = req->tsdbTsDataReq.uid;
1✔
2306
  SStorageAPI api = {0};
1✔
2307
  initStorageAPI(&api);
1✔
2308
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->tsBlock, &api));
1!
2309
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
1!
2310

2311
  while (1) {
1✔
2312
    bool hasNext = false;
2✔
2313
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
2!
2314
    if (!hasNext) {
2✔
2315
      break;
1✔
2316
    }
2317

2318
    SSDataBlock* pBlock = NULL;
1✔
2319
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
1!
2320
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRes, pBlock));
1!
2321
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
1!
2322
            TD_VID(pVnode), __func__, pBlockRes->info.window.skey, pBlockRes->info.window.ekey,
2323
            pBlockRes->info.id.uid, pBlockRes->info.id.groupId, pBlockRes->info.rows);
2324
  }
2325

2326
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
1!
2327
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
1!
2328

2329
end:
1✔
2330
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1!
2331
  SRpcMsg rsp = {
1✔
2332
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2333
  tmsgSendRsp(&rsp);
1✔
2334
  blockDataDestroy(pBlockRes);
1✔
2335

2336
  releaseStreamTask(&pTaskInner);
1✔
2337
  return code;
1✔
2338
}
2339

2340
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
293✔
2341
  int32_t code = 0;
293✔
2342
  int32_t lino = 0;
293✔
2343
  void*   buf = NULL;
293✔
2344
  size_t  size = 0;
293✔
2345

2346
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
293!
2347
  SStreamReaderTaskInner* pTaskInner = NULL;
293✔
2348
  void* pTask = sStreamReaderInfo->pTask;
293✔
2349
  ST_TASK_DLOG("vgId:%d %s start. ver:%"PRId64",order:%d,startTs:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, req->tsdbTriggerDataReq.gid);
293✔
2350
  
2351
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
293✔
2352

2353
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
293✔
2354
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
141✔
2355
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
2356
                 req->tsdbTriggerDataReq.gid, true, NULL);
2357
    SStorageAPI api = {0};
141✔
2358
    initStorageAPI(&api);
141✔
2359
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
141!
2360

2361
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
141!
2362
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
141!
2363
  } else {
2364
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
152✔
2365
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
152!
2366
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
152✔
2367
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
152!
2368
  }
2369

2370
  blockDataCleanup(pTaskInner->pResBlockDst);
293✔
2371
  bool hasNext = true;
293✔
2372
  while (1) {
×
2373
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
293!
2374
    if (!hasNext) {
293✔
2375
      break;
141✔
2376
    }
2377
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
152✔
2378
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
152✔
2379

2380
    SSDataBlock* pBlock = NULL;
152✔
2381
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
152!
2382
    if (pBlock != NULL && pBlock->info.rows > 0) {
152!
2383
      STREAM_CHECK_RET_GOTO(
152!
2384
        processTag(pVnode, sStreamReaderInfo, false, &pTaskInner->api, pBlock->info.id.uid, pBlock, 0, pBlock->info.rows, 1));
2385
    }
2386
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
152!
2387
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
152!
2388
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
152✔
2389
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2390
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2391
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
152!
2392
      break;
152✔
2393
    }
2394
  }
2395

2396
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
293!
2397
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
293✔
2398
  if (!hasNext) {
293✔
2399
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
141!
2400
  }
2401

2402
end:
293✔
2403
  STREAM_PRINT_LOG_END_WITHID(code, lino);
293!
2404
  SRpcMsg rsp = {
293✔
2405
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2406
  tmsgSendRsp(&rsp);
293✔
2407

2408
  return code;
293✔
2409
}
2410

2411
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
28,789✔
2412
  int32_t code = 0;
28,789✔
2413
  int32_t lino = 0;
28,789✔
2414
  void*   buf = NULL;
28,789✔
2415
  size_t  size = 0;
28,789✔
2416
  SSDataBlock*            pBlockRes = NULL;
28,789✔
2417

2418
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
28,789!
2419
  void* pTask = sStreamReaderInfo->pTask;
28,789✔
2420
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, 
28,789✔
2421
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid);
2422

2423
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
28,790!
2424

2425
  SStreamReaderTaskInner* pTaskInner = NULL;
28,790✔
2426
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
28,790✔
2427

2428
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
28,790!
2429
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
28,790✔
2430
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
2431
    SStorageAPI api = {0};
28,790✔
2432
    initStorageAPI(&api);
28,790✔
2433
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
28,790✔
2434

2435
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
28,444!
2436
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
28,444!
2437
  } else {
2438
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2439
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2440
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2441
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2442
  }
2443

2444
  blockDataCleanup(pTaskInner->pResBlockDst);
28,444✔
2445
  bool hasNext = true;
28,444✔
2446
  while (1) {
2,741✔
2447
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
31,185!
2448
    if (!hasNext) {
31,180✔
2449
      break;
28,439✔
2450
    }
2451
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
2,741✔
2452

2453
    SSDataBlock* pBlock = NULL;
2,742✔
2454
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
2,742!
2455
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
2,741!
2456
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
2,741!
2457
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
2,741!
2458
      break;
×
2459
    }
2460
  }
2461

2462
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
28,439!
2463
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
28,436!
2464
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
28,437✔
2465
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
28,437!
2466
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
28,436✔
2467
  if (!hasNext) {
28,444!
2468
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
28,444!
2469
  }
2470

2471
end:
28,441✔
2472
  STREAM_PRINT_LOG_END_WITHID(code, lino);
28,787!
2473
  SRpcMsg rsp = {
28,789✔
2474
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2475
  tmsgSendRsp(&rsp);
28,789✔
2476
  blockDataDestroy(pBlockRes);
28,790✔
2477
  return code;
28,788✔
2478
}
2479

2480
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
837✔
2481
  int32_t code = 0;
837✔
2482
  int32_t lino = 0;
837✔
2483
  void*   buf = NULL;
837✔
2484
  size_t  size = 0;
837✔
2485
  int32_t* slotIdList = NULL;
837✔
2486
  SArray* sortedCid = NULL;
837✔
2487
  SArray* schemas = NULL;
837✔
2488
  
2489
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
837!
2490
  void* pTask = sStreamReaderInfo->pTask;
837✔
2491
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
837✔
2492

2493
  SStreamReaderTaskInner* pTaskInner = NULL;
837✔
2494
  int64_t key = req->tsdbDataReq.uid;
837✔
2495

2496
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
837!
2497
    // sort cid and build slotIdList
2498
    slotIdList = taosMemoryMalloc(taosArrayGetSize(req->tsdbDataReq.cids) * sizeof(int32_t));
837!
2499
    STREAM_CHECK_NULL_GOTO(slotIdList, terrno);
839!
2500
    sortedCid = taosArrayDup(req->tsdbDataReq.cids, NULL);
837✔
2501
    STREAM_CHECK_NULL_GOTO(sortedCid, terrno);
837!
2502
    taosArraySort(sortedCid, sortCid);
837✔
2503
    for (int32_t i = 0; i < taosArrayGetSize(req->tsdbDataReq.cids); i++) {
3,202✔
2504
      int16_t* cid = taosArrayGet(req->tsdbDataReq.cids, i);
2,365✔
2505
      STREAM_CHECK_NULL_GOTO(cid, terrno);
2,365!
2506
      for (int32_t j = 0; j < taosArrayGetSize(sortedCid); j++) {
4,593!
2507
        int16_t* cidSorted = taosArrayGet(sortedCid, j);
4,593✔
2508
        STREAM_CHECK_NULL_GOTO(cidSorted, terrno);
4,593!
2509
        if (*cid == *cidSorted) {
4,593✔
2510
          slotIdList[j] = i;
2,365✔
2511
          break;
2,365✔
2512
        }
2513
      }
2514
    }
2515

2516
    STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, req->tsdbDataReq.uid, &schemas));
837✔
2517
    STREAM_CHECK_RET_GOTO(shrinkScheams(req->tsdbDataReq.cids, schemas));
835!
2518
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbDataReq.ver, req->tsdbDataReq.order, req->tsdbDataReq.skey,
835✔
2519
                    req->tsdbDataReq.ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
2520

2521
    options.suid = req->tsdbDataReq.suid;
835✔
2522
    options.uid = req->tsdbDataReq.uid;
835✔
2523

2524
    SStorageAPI api = {0};
835✔
2525
    initStorageAPI(&api);
835✔
2526
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
835!
2527
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
835!
2528

2529
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
835✔
2530
    cleanupQueryTableDataCond(&pTaskInner->cond);
835✔
2531
    taosArraySort(pTaskInner->options.schemas, sortSSchema);
835✔
2532

2533
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
835!
2534
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
2535
                                                        pTaskInner->options.suid, pTaskInner->options.ver, &slotIdList));
2536
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
835!
2537
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
2538
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
835!
2539
  } else {
2540
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2541
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2542
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2543
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2544
  }
2545

2546
  blockDataCleanup(pTaskInner->pResBlockDst);
835✔
2547
  bool hasNext = true;
835✔
2548
  while (1) {
835✔
2549
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
1,670!
2550
    if (!hasNext) {
1,670✔
2551
      break;
835✔
2552
    }
2553

2554
    SSDataBlock* pBlock = NULL;
835✔
2555
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
835!
2556
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
835!
2557
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
835!
2558
      break;
×
2559
    }
2560
  }
2561
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
835!
2562
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
835✔
2563
  printDataBlock(pTaskInner->pResBlockDst, __func__, "tsdb_data", ((SStreamTask*)pTask)->streamId);
835✔
2564
  if (!hasNext) {
835!
2565
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
835!
2566
  }
2567

2568
end:
835✔
2569
  STREAM_PRINT_LOG_END_WITHID(code, lino);
837!
2570
  SRpcMsg rsp = {
837✔
2571
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2572
  tmsgSendRsp(&rsp);
837✔
2573
  taosMemFree(slotIdList);
837✔
2574
  taosArrayDestroy(sortedCid);
837✔
2575
  taosArrayDestroy(schemas);
837✔
2576
  return code;
837✔
2577
}
2578

2579
static int32_t vnodeProcessStreamWalMetaNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
15,349✔
2580
  int32_t      code = 0;
15,349✔
2581
  int32_t      lino = 0;
15,349✔
2582
  void*        buf = NULL;
15,349✔
2583
  size_t       size = 0;
15,349✔
2584
  int64_t      lastVer = 0;
15,349✔
2585
  SSTriggerWalNewRsp resultRsp = {0};
15,349✔
2586

2587
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
15,349✔
2588
  void* pTask = sStreamReaderInfo->pTask;
15,280✔
2589
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaNewReq.lastVer);
15,280✔
2590

2591
  if (sStreamReaderInfo->metaBlock == NULL) {
15,280✔
2592
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
388!
2593
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
388!
2594
  }
2595
  blockDataEmpty(sStreamReaderInfo->metaBlock);
15,280✔
2596
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
15,257✔
2597
  resultRsp.ver = req->walMetaNewReq.lastVer;
15,257✔
2598
  STREAM_CHECK_RET_GOTO(processWalVerMetaNew(pVnode, &resultRsp, sStreamReaderInfo, req->walMetaNewReq.ctime));
15,257!
2599

2600
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
15,279✔
2601
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
15,278✔
2602
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, NULL);
520✔
2603
  buf = rpcMallocCont(size);
520✔
2604
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, NULL);
520✔
2605
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
519✔
2606

2607
end:
15,349✔
2608
  if (resultRsp.totalRows == 0) {
15,349✔
2609
    code = TSDB_CODE_STREAM_NO_DATA;
14,830✔
2610
    buf = rpcMallocCont(sizeof(int64_t));
14,830✔
2611
    *(int64_t *)buf = resultRsp.ver;
14,839✔
2612
    size = sizeof(int64_t);
14,839✔
2613
  }
2614
  SRpcMsg rsp = {
15,358✔
2615
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2616
  tmsgSendRsp(&rsp);
15,358✔
2617
  if (code == TSDB_CODE_STREAM_NO_DATA){
15,358✔
2618
    code = 0;
14,838✔
2619
  }
2620
  STREAM_PRINT_LOG_END_WITHID(code, lino);
15,358!
2621
  blockDataDestroy(resultRsp.deleteBlock);
15,358✔
2622
  blockDataDestroy(resultRsp.dropBlock);
15,356✔
2623

2624
  return code;
15,359✔
2625
}
2626
static int32_t vnodeProcessStreamWalMetaDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
8,386✔
2627
  int32_t      code = 0;
8,386✔
2628
  int32_t      lino = 0;
8,386✔
2629
  void*        buf = NULL;
8,386✔
2630
  size_t       size = 0;
8,386✔
2631
  SSTriggerWalNewRsp resultRsp = {0};
8,386✔
2632
  
2633
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
8,386!
2634
  void* pTask = sStreamReaderInfo->pTask;
8,386✔
2635
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaDataNewReq.lastVer);
8,386✔
2636

2637
  if (sStreamReaderInfo->metaBlock == NULL) {
8,386✔
2638
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
176!
2639
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
176!
2640
  }
2641
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
8,386✔
2642
  resultRsp.dataBlock = sStreamReaderInfo->triggerBlock;
8,386✔
2643
  resultRsp.ver = req->walMetaDataNewReq.lastVer;
8,386✔
2644
  STREAM_CHECK_RET_GOTO(processWalVerMetaDataNew(pVnode, sStreamReaderInfo, &resultRsp));
8,386!
2645

2646
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
8,380✔
2647
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
312✔
2648
  buf = rpcMallocCont(size);
312✔
2649
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
312✔
2650
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
311✔
2651
  printDataBlock(sStreamReaderInfo->triggerBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
311✔
2652
  printDataBlock(resultRsp.dropBlock, __func__, "drop", ((SStreamTask*)pTask)->streamId);
311✔
2653
  printDataBlock(resultRsp.deleteBlock, __func__, "delete", ((SStreamTask*)pTask)->streamId);
311✔
2654
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
312✔
2655

2656
end:
8,376✔
2657
  if (resultRsp.totalRows == 0) {
8,376✔
2658
    buf = rpcMallocCont(sizeof(int64_t));
8,064✔
2659
    *(int64_t *)buf = resultRsp.ver;
8,063✔
2660
    size = sizeof(int64_t);
8,063✔
2661
    code = TSDB_CODE_STREAM_NO_DATA;
8,063✔
2662
  }
2663
  SRpcMsg rsp = {
8,375✔
2664
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2665
  tmsgSendRsp(&rsp);
8,375✔
2666
  if (code == TSDB_CODE_STREAM_NO_DATA){
8,377✔
2667
    code = 0;
8,066✔
2668
  }
2669
  blockDataDestroy(resultRsp.deleteBlock);
8,377✔
2670
  blockDataDestroy(resultRsp.dropBlock);
8,370✔
2671

2672
  STREAM_PRINT_LOG_END_WITHID(code, lino);
8,368!
2673

2674
  return code;
8,365✔
2675
}
2676

2677
static int32_t vnodeProcessStreamWalDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
7,185✔
2678
  int32_t      code = 0;
7,185✔
2679
  int32_t      lino = 0;
7,185✔
2680
  void*        buf = NULL;
7,185✔
2681
  size_t       size = 0;
7,185✔
2682
  SSTriggerWalNewRsp resultRsp = {0};
7,185✔
2683

2684
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
7,185✔
2685
  void* pTask = sStreamReaderInfo->pTask;
7,184✔
2686
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
7,184✔
2687

2688
  resultRsp.dataBlock = sStreamReaderInfo->triggerBlock;
7,184✔
2689
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
7,184!
2690
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
7,184✔
2691

2692
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
7,184✔
2693

2694
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
267✔
2695
  buf = rpcMallocCont(size);
267✔
2696
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
267✔
2697
  printDataBlock(sStreamReaderInfo->triggerBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
267✔
2698
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
267✔
2699

2700
end:
7,187✔
2701
  if (resultRsp.totalRows == 0) {
7,187✔
2702
    buf = rpcMallocCont(sizeof(int64_t));
6,920✔
2703
    *(int64_t *)buf = resultRsp.ver;
6,924✔
2704
    size = sizeof(int64_t);
6,924✔
2705
    code = TSDB_CODE_STREAM_NO_DATA;
6,924✔
2706
  }
2707
  SRpcMsg rsp = {
7,191✔
2708
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2709
  tmsgSendRsp(&rsp);
7,191✔
2710
  if (code == TSDB_CODE_STREAM_NO_DATA){
7,190✔
2711
    code = 0;
6,924✔
2712
  }
2713

2714
  blockDataDestroy(resultRsp.deleteBlock);
7,190✔
2715
  blockDataDestroy(resultRsp.dropBlock);
7,190✔
2716
  STREAM_PRINT_LOG_END_WITHID(code, lino);
7,194!
2717

2718
  return code;
7,190✔
2719
}
2720

2721
static int32_t vnodeProcessStreamWalCalcDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,221✔
2722
  int32_t      code = 0;
1,221✔
2723
  int32_t      lino = 0;
1,221✔
2724
  void*        buf = NULL;
1,221✔
2725
  size_t       size = 0;
1,221✔
2726
  SSTriggerWalNewRsp resultRsp = {0};
1,221✔
2727
  SSDataBlock* pBlock1 = NULL;
1,221✔
2728
  SSDataBlock* pBlock2 = NULL;
1,221✔
2729
  
2730
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
1,221!
2731
  void* pTask = sStreamReaderInfo->pTask;
1,221✔
2732
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
1,221✔
2733

2734
  resultRsp.dataBlock = sStreamReaderInfo->isVtableStream ? sStreamReaderInfo->calcBlock : sStreamReaderInfo->triggerBlock;
1,221✔
2735
  resultRsp.isCalc = sStreamReaderInfo->isVtableStream ? true : false;
1,221✔
2736
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
1,221!
2737
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
1,221✔
2738

2739
  if (!sStreamReaderInfo->isVtableStream){
691✔
2740
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerBlock, true, &pBlock1));
489!
2741
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcBlock, false, &pBlock2));
489!
2742
  
2743
    blockDataTransform(pBlock2, pBlock1);
489✔
2744
    resultRsp.dataBlock = pBlock2;
489✔
2745
  }
2746

2747
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
691✔
2748
  buf = rpcMallocCont(size);
691✔
2749
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
691✔
2750
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
691✔
2751
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
691✔
2752

2753
end:
1,221✔
2754
  if (resultRsp.totalRows == 0) {
1,221✔
2755
    buf = rpcMallocCont(sizeof(int64_t));
530✔
2756
    *(int64_t *)buf = resultRsp.ver;
530✔
2757
    size = sizeof(int64_t);
530✔
2758
    code = TSDB_CODE_STREAM_NO_DATA;
530✔
2759
  }
2760
  SRpcMsg rsp = {
1,221✔
2761
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2762
  tmsgSendRsp(&rsp);
1,221✔
2763
  if (code == TSDB_CODE_STREAM_NO_DATA){
1,221✔
2764
    code = 0;
530✔
2765
  }
2766

2767
  blockDataDestroy(pBlock1);
1,221✔
2768
  blockDataDestroy(pBlock2);
1,221✔
2769
  blockDataDestroy(resultRsp.deleteBlock);
1,221✔
2770
  blockDataDestroy(resultRsp.dropBlock);
1,221✔
2771
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,221!
2772

2773
  return code;
1,221✔
2774
}
2775

2776
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
745✔
2777
  int32_t code = 0;
745✔
2778
  int32_t lino = 0;
745✔
2779
  void*   buf = NULL;
745✔
2780
  size_t  size = 0;
745✔
2781

2782
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
745!
2783
  void* pTask = sStreamReaderInfo->pTask;
745✔
2784
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
745✔
2785

2786
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
745✔
2787
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
746!
2788
  SStreamGroupInfo pGroupInfo = {0};
746✔
2789
  pGroupInfo.gInfo = *gInfo;
746✔
2790

2791
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
746✔
2792
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
2793
  buf = rpcMallocCont(size);
746✔
2794
  STREAM_CHECK_NULL_GOTO(buf, terrno);
746!
2795
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
746✔
2796
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
2797
end:
746✔
2798
  if (code != 0) {
746!
2799
    rpcFreeCont(buf);
×
2800
    buf = NULL;
×
2801
    size = 0;
×
2802
  }
2803
  STREAM_PRINT_LOG_END_WITHID(code, lino);
746!
2804
  SRpcMsg rsp = {
746✔
2805
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2806
  tmsgSendRsp(&rsp);
746✔
2807

2808
  return code;
746✔
2809
}
2810

2811
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
732✔
2812
  int32_t              code = 0;
732✔
2813
  int32_t              lino = 0;
732✔
2814
  void*                buf = NULL;
732✔
2815
  size_t               size = 0;
732✔
2816
  SStreamMsgVTableInfo vTableInfo = {0};
732✔
2817
  SMetaReader          metaReader = {0};
732✔
2818
  SStorageAPI api = {0};
732✔
2819
  initStorageAPI(&api);
732✔
2820

2821
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
733!
2822
  void* pTask = sStreamReaderInfo->pTask;
733✔
2823
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
733✔
2824

2825
  SArray* cids = req->virTableInfoReq.cids;
733✔
2826
  STREAM_CHECK_NULL_GOTO(cids, terrno);
733!
2827

2828
  SArray* pTableListArray = qStreamGetTableArrayList(sStreamReaderInfo->tableList);
733✔
2829
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
733!
2830

2831
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
733✔
2832
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
733!
2833
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
733✔
2834

2835
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
2,319✔
2836
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
1,586✔
2837
    if (pKeyInfo == NULL) {
1,586!
2838
      continue;
×
2839
    }
2840
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
1,586✔
2841
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
1,586!
2842
    vTable->uid = pKeyInfo->uid;
1,586✔
2843
    vTable->gId = pKeyInfo->groupId;
1,586✔
2844

2845
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
1,586✔
2846
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
1,586!
2847
      vTable->cols.nCols = metaReader.me.colRef.nCols;
109✔
2848
      vTable->cols.version = metaReader.me.colRef.version;
109✔
2849
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
109!
2850
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
654✔
2851
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
545✔
2852
      }
2853
    } else {
2854
      vTable->cols.nCols = taosArrayGetSize(cids);
1,477✔
2855
      vTable->cols.version = metaReader.me.colRef.version;
1,477✔
2856
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
1,477!
2857
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
5,406✔
2858
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
15,918✔
2859
          if (metaReader.me.colRef.pColRef[j].hasRef &&
14,442✔
2860
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
10,419✔
2861
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
2,454✔
2862
            break;
2,454✔
2863
          }
2864
        }
2865
      }
2866
    }
2867
    tDecoderClear(&metaReader.coder);
1,585✔
2868
  }
2869
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
732✔
2870
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
732!
2871

2872
end:
733✔
2873
  tDestroySStreamMsgVTableInfo(&vTableInfo);
733✔
2874
  api.metaReaderFn.clearReader(&metaReader);
733✔
2875
  STREAM_PRINT_LOG_END_WITHID(code, lino);
733!
2876
  SRpcMsg rsp = {
733✔
2877
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2878
  tmsgSendRsp(&rsp);
733✔
2879
  return code;
732✔
2880
}
2881

2882
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
196✔
2883
  int32_t                   code = 0;
196✔
2884
  int32_t                   lino = 0;
196✔
2885
  void*                     buf = NULL;
196✔
2886
  size_t                    size = 0;
196✔
2887
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
196✔
2888
  SMetaReader               metaReader = {0};
196✔
2889
  int64_t streamId = req->base.streamId;
196✔
2890
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
196✔
2891

2892
  SStorageAPI api = {0};
196✔
2893
  initStorageAPI(&api);
196✔
2894

2895
  SArray* cols = req->origTableInfoReq.cols;
196✔
2896
  STREAM_CHECK_NULL_GOTO(cols, terrno);
196!
2897

2898
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
196✔
2899

2900
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
196!
2901

2902
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
196✔
2903
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
721✔
2904
    OTableInfo*    oInfo = taosArrayGet(cols, i);
525✔
2905
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
525✔
2906
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
525!
2907
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
525!
2908
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
525!
2909
    vTableInfo->uid = metaReader.me.uid;
525✔
2910
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
525✔
2911

2912
    SSchemaWrapper* sSchemaWrapper = NULL;
524✔
2913
    if (metaReader.me.type == TD_CHILD_TABLE) {
524✔
2914
      int64_t suid = metaReader.me.ctbEntry.suid;
520✔
2915
      vTableInfo->suid = suid;
520✔
2916
      tDecoderClear(&metaReader.coder);
520✔
2917
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
520!
2918
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
519✔
2919
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
4!
2920
      vTableInfo->suid = 0;
4✔
2921
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
4✔
2922
    } else {
2923
      stError("invalid table type:%d", metaReader.me.type);
×
2924
    }
2925

2926
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
1,767!
2927
      SSchema* s = sSchemaWrapper->pSchema + j;
1,768✔
2928
      if (strcmp(s->name, oInfo->refColName) == 0) {
1,768✔
2929
        vTableInfo->cid = s->colId;
524✔
2930
        break;
524✔
2931
      }
2932
    }
2933
    if (vTableInfo->cid == 0) {
523!
2934
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
2935
              oInfo->refTableName);
2936
    }
2937
    tDecoderClear(&metaReader.coder);
523✔
2938
  }
2939

2940
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
196!
2941

2942
end:
196✔
2943
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
196✔
2944
  api.metaReaderFn.clearReader(&metaReader);
196✔
2945
  STREAM_PRINT_LOG_END(code, lino);
196!
2946
  SRpcMsg rsp = {
196✔
2947
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2948
  tmsgSendRsp(&rsp);
196✔
2949
  return code;
196✔
2950
}
2951

2952
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
1,404✔
2953
  int32_t                   code = 0;
1,404✔
2954
  int32_t                   lino = 0;
1,404✔
2955
  void*                     buf = NULL;
1,404✔
2956
  size_t                    size = 0;
1,404✔
2957
  SSDataBlock* pBlock = NULL;
1,404✔
2958

2959
  SMetaReader               metaReader = {0};
1,404✔
2960
  SMetaReader               metaReaderStable = {0};
1,404✔
2961
  int64_t streamId = req->base.streamId;
1,404✔
2962
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
1,404✔
2963

2964
  SStorageAPI api = {0};
1,404✔
2965
  initStorageAPI(&api);
1,404✔
2966

2967
  SArray* cols = req->virTablePseudoColReq.cids;
1,404✔
2968
  STREAM_CHECK_NULL_GOTO(cols, terrno);
1,404!
2969

2970
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
1,404✔
2971
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
1,404!
2972

2973
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
1,404!
2974

2975
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
1,404!
2976
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
1,404✔
2977
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 || *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
7!
2978
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
7✔
2979
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
7!
2980
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
7!
2981
    pBlock->info.rows = 1;
7✔
2982
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
7✔
2983
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
7!
2984
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
7!
2985
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
1,397!
2986
    int64_t suid = metaReader.me.ctbEntry.suid;
1,397✔
2987
    api.metaReaderFn.readerReleaseLock(&metaReader);
1,397✔
2988
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
1,397✔
2989

2990
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
1,397!
2991
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
1,397✔
2992
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
3,669✔
2993
      col_id_t* id = taosArrayGet(cols, i);
2,270✔
2994
      STREAM_CHECK_NULL_GOTO(id, terrno);
2,271!
2995
      if (*id == -1) {
2,271✔
2996
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
1,390✔
2997
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
1,391!
2998
        continue;
1,391✔
2999
      }
3000
      size_t j = 0;
881✔
3001
      for (; j < sSchemaWrapper->nCols; j++) {
1,585✔
3002
        SSchema* s = sSchemaWrapper->pSchema + j;
1,584✔
3003
        if (s->colId == *id) {
1,584✔
3004
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
880✔
3005
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
881!
3006
          break;
880✔
3007
        }
3008
      }
3009
      if (j == sSchemaWrapper->nCols) {
881!
3010
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
3011
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
3012
      }
3013
    }
3014
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
1,397!
3015
    pBlock->info.rows = 1;
1,397✔
3016
    
3017
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
3,667✔
3018
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
2,271✔
3019
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
2,271!
3020

3021
      if (pDst->info.colId == -1) {
2,271✔
3022
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
1,390!
3023
        continue;
1,390✔
3024
      }
3025
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
881!
3026
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
3027
        continue;
×
3028
      }
3029

3030
      STagVal val = {0};
881✔
3031
      val.cid = pDst->info.colId;
881✔
3032
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
881✔
3033

3034
      char* data = NULL;
881✔
3035
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
881!
3036
        data = tTagValToData((const STagVal*)p, false);
881✔
3037
      } else {
3038
        data = (char*)p;
×
3039
      }
3040

3041
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
881!
3042
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
3043

3044
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
880!
3045
          (data != NULL)) {
3046
        taosMemoryFree(data);
703!
3047
      }
3048
    }
3049
  } else {
3050
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
3051
    code = TSDB_CODE_INVALID_PARA;
×
3052
    goto end;
×
3053
  }
3054
  
3055
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
1,403✔
3056
  printDataBlock(pBlock, __func__, "", streamId);
1,404✔
3057
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
1,404!
3058

3059
end:
1,404✔
3060
  if(size == 0){
1,404!
3061
    code = TSDB_CODE_STREAM_NO_DATA;
×
3062
  }
3063
  api.metaReaderFn.clearReader(&metaReaderStable);
1,404✔
3064
  api.metaReaderFn.clearReader(&metaReader);
1,404✔
3065
  STREAM_PRINT_LOG_END(code, lino);
1,404!
3066
  SRpcMsg rsp = {
1,404✔
3067
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3068
  tmsgSendRsp(&rsp);
1,404✔
3069
  blockDataDestroy(pBlock);
1,404✔
3070
  return code;
1,404✔
3071
}
3072

3073
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
16,534✔
3074
  int32_t            code = 0;
16,534✔
3075
  int32_t            lino = 0;
16,534✔
3076
  void*              buf = NULL;
16,534✔
3077
  size_t             size = 0;
16,534✔
3078
  void*              taskAddr = NULL;
16,534✔
3079
  SArray*            pResList = NULL;
16,534✔
3080
  bool               hasNext = false;
16,534✔
3081

3082
  SResFetchReq req = {0};
16,534✔
3083
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
16,534!
3084
                              TSDB_CODE_QRY_INVALID_INPUT);
3085
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
16,534✔
3086
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
16,534!
3087

3088
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
16,534!
3089
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
16,534✔
3090
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
16,534!
3091
  void* pTask = sStreamReaderCalcInfo->pTask;
16,534✔
3092
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
16,534✔
3093
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
3094

3095
  if (req.reset) {
16,534✔
3096
    int64_t uid = 0;
16,436✔
3097
    if (req.dynTbname) {
16,436✔
3098
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
752✔
3099
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
752!
3100
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
752✔
3101
        if (pValue != NULL && pValue->isTbname) {
752!
3102
          uid = pValue->uid;
752✔
3103
          break;
752✔
3104
        }
3105
      }
3106
    }
3107
    
3108
    SReadHandle handle = {0};
16,436✔
3109
    handle.vnode = pVnode;
16,436✔
3110
    handle.uid = uid;
16,436✔
3111

3112
    initStorageAPI(&handle.api);
16,436✔
3113
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
16,436✔
3114
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
14,611✔
3115
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
14,020✔
3116
      if (node != NULL) {
14,020✔
3117
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle));
1,602!
3118
      } else {
3119
        ST_TASK_DLOG("vgId:%d %s no time range node", TD_VID(pVnode), __func__);
12,422✔
3120
      }
3121
    }
3122

3123
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
16,436✔
3124
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
16,436✔
3125

3126
    if (sStreamReaderCalcInfo->pTaskInfo == NULL || !qNeedReset(sStreamReaderCalcInfo->pTaskInfo)) {
16,436✔
3127
      qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
785✔
3128
      STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
785✔
3129
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
3130
                                                    req.taskId));
3131
    } else {
3132
      STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, &handle));
15,651!
3133
    }
3134

3135
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
16,432!
3136
  }
3137

3138
  if (req.pOpParam != NULL) {
16,530✔
3139
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
819✔
3140
  }
3141
  
3142
  pResList = taosArrayInit(4, POINTER_BYTES);
16,530✔
3143
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
16,530!
3144
  uint64_t ts = 0;
16,530✔
3145
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
16,530✔
3146

3147
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
35,272✔
3148
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
18,752✔
3149
    if (pBlock == NULL) continue;
18,752!
3150
    printDataBlock(pBlock, __func__, "fetch", ((SStreamTask*)pTask)->streamId);
18,752✔
3151
/*    
3152
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
3153
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
3154
      printDataBlock(pBlock, __func__, "fetch filter");
3155
    }
3156
*/    
3157
  }
3158

3159
end:
16,520✔
3160
  ST_TASK_DLOG("vgId:%d %s start to build rsp", TD_VID(pVnode), __func__);
16,534✔
3161
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
16,534!
3162
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
16,534✔
3163
  taosArrayDestroy(pResList);
16,534✔
3164
  streamReleaseTask(taskAddr);
16,534✔
3165

3166
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST){
16,534!
3167
    code = TDB_CODE_SUCCESS;
×
3168
  }
3169
  STREAM_PRINT_LOG_END(code, lino);
16,534!
3170
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
16,534✔
3171
  tmsgSendRsp(&rsp);
16,534✔
3172
  tDestroySResFetchReq(&req);
16,534✔
3173
  return code;
16,533✔
3174
}
3175

3176
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
84,058✔
3177
  int32_t                   code = 0;
84,058✔
3178
  int32_t                   lino = 0;
84,058✔
3179
  SSTriggerPullRequestUnion req = {0};
84,058✔
3180
  void*                     taskAddr = NULL;
84,058✔
3181

3182
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
84,058✔
3183
  if (!syncIsReadyForRead(pVnode->sync)) {
84,057✔
3184
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
91✔
3185
    return 0;
100✔
3186
  }
3187

3188
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
83,950✔
3189
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
16,534✔
3190
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
67,416✔
3191
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
67,410✔
3192
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
67,410✔
3193
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
67,410!
3194
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
67,396✔
3195
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
3196
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
67,406✔
3197
    if (sStreamReaderInfo != NULL) {  
67,399✔
3198
      (void)taosThreadMutexLock(&sStreamReaderInfo->mutex);
67,142✔
3199
      if (sStreamReaderInfo->tableList == NULL) {
67,155✔
3200
        STREAM_CHECK_RET_GOTO(generateTablistForStreamReader(pVnode, sStreamReaderInfo, false));  
563!
3201
        STREAM_CHECK_RET_GOTO(generateTablistForStreamReader(pVnode, sStreamReaderInfo, true));
564!
3202
        STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamReaderInfo->pConditions, &sStreamReaderInfo->pFilterInfo, 0, NULL));
564!
3203
      }
3204
      (void)taosThreadMutexUnlock(&sStreamReaderInfo->mutex);
67,156✔
3205
      sStreamReaderInfo->pVnode = pVnode;
67,153✔
3206
    }
3207
    switch (req.base.type) {
67,410!
3208
      case STRIGGER_PULL_SET_TABLE:
196✔
3209
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo));
196!
3210
        break;
196✔
3211
      case STRIGGER_PULL_LAST_TS:
554✔
3212
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
554!
3213
        break;
553✔
3214
      case STRIGGER_PULL_FIRST_TS:
495✔
3215
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
495✔
3216
        break;
482✔
3217
      case STRIGGER_PULL_TSDB_META:
972✔
3218
      case STRIGGER_PULL_TSDB_META_NEXT:
3219
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo));
972✔
3220
        break;
960✔
3221
      case STRIGGER_PULL_TSDB_TS_DATA:
44✔
3222
        if (sStreamReaderInfo->isVtableStream) {
44✔
3223
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo));
1!
3224
        } else {
3225
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo));
43!
3226
        }
3227
        break;
44✔
3228
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
293✔
3229
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
3230
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
293!
3231
        break;
293✔
3232
      case STRIGGER_PULL_TSDB_CALC_DATA:
28,788✔
3233
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
3234
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
28,788✔
3235
        break;
28,443✔
3236
      case STRIGGER_PULL_TSDB_DATA:
837✔
3237
      case STRIGGER_PULL_TSDB_DATA_NEXT:
3238
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
837✔
3239
        break;
835✔
3240
      case STRIGGER_PULL_GROUP_COL_VALUE:
745✔
3241
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo));
745!
3242
        break;
746✔
3243
      case STRIGGER_PULL_VTABLE_INFO:
733✔
3244
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
733!
3245
        break;
731✔
3246
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
1,404✔
3247
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req));
1,404!
3248
        break;
1,404✔
3249
      case STRIGGER_PULL_OTABLE_INFO:
196✔
3250
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req));
196!
3251
        break;
196✔
3252
      case STRIGGER_PULL_WAL_META_NEW:
15,355✔
3253
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
15,355!
3254
        break;
15,357✔
3255
      case STRIGGER_PULL_WAL_DATA_NEW:
7,188✔
3256
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
7,188!
3257
        break;
7,188✔
3258
      case STRIGGER_PULL_WAL_META_DATA_NEW:
8,389✔
3259
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
8,389!
3260
        break;
8,361✔
3261
      case STRIGGER_PULL_WAL_CALC_DATA_NEW:
1,221✔
3262
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
1,221!
3263
        break;
1,221✔
3264
      default:
×
3265
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
3266
        STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
3267
        break;
×
3268
    }
3269
  } else {
3270
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
6!
3271
    STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
3272
  }
3273
end:
×
3274

3275
  streamReleaseTask(taskAddr);
67,382✔
3276

3277
  tDestroySTriggerPullRequest(&req);
67,424✔
3278
  STREAM_PRINT_LOG_END(code, lino);
67,409!
3279
  return code;
67,399✔
3280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc