• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4761

28 Sep 2025 10:49AM UTC coverage: 57.837% (-1.0%) from 58.866%
#4761

push

travis-ci

web-flow
merge: set version (#33122)

136913 of 302095 branches covered (45.32%)

Branch coverage included in aggregate %.

207750 of 293830 relevant lines covered (70.7%)

5673932.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/anomalywindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
20
#include "operator.h"
21
#include "querytask.h"
22
#include "tanalytics.h"
23
#include "taoserror.h"
24
#include "tcommon.h"
25
#include "tdatablock.h"
26
#include "tjson.h"
27

28
#ifdef USE_ANALYTICS
29

30
typedef struct {
31
  SArray*     blocks;   // SSDataBlock*
32
  SArray*     windows;  // STimeWindow
33
  SArray*     pMaskList; // anomaly mask for each window
34
  uint64_t    groupId;
35
  int64_t     cachedRows;
36
  int32_t     curWinIndex;
37
  int32_t     curMask;
38
  STimeWindow curWin;
39
  SResultRow* pResultRow;
40
} SAnomalyWindowSupp;
41

42
typedef struct {
43
  SOptrBasicInfo     binfo;
44
  SAggSupporter      aggSup;
45
  SExprSupp          scalarSup;
46
  int32_t            tsSlotId;
47
  int32_t            resMarkSlotId;
48
  STimeWindowAggSupp twAggSup;
49
  char               algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
50
  char               algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
51
  char               anomalyOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN];
52
  int64_t            timeout;
53
  int8_t             wncheck;
54

55
  SAnomalyWindowSupp anomalySup;
56
  SWindowRowsSup     anomalyWinRowSup;
57
  SColumn            anomalyCol;
58
  SStateKeys         anomalyKey;
59
} SAnomalyWindowOperatorInfo;
60

61
static void    anomalyDestroyOperatorInfo(void* param);
62
static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
63
static int32_t anomalyAggregateBlocks(SOperatorInfo* pOperator);
64
static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
65
static int32_t initOptions(SAnomalyWindowOperatorInfo* pInfo, SAnomalyWindowPhysiNode* pAnomalyNode, const char* id);
66
static int32_t resetAnomalyWindowOperState(SOperatorInfo* pOper);
67
static void setResSlot(SAnomalyWindowOperatorInfo* pInfo, SAnomalyWindowPhysiNode* pAnomalyNode, SExprInfo* pExprInfo,
68
                int32_t num);
69

70
static int32_t resetAnomalyWindowOperState(SOperatorInfo* pOper) {
×
71
  int32_t code = 0, lino = 0;
×
72
  SAnomalyWindowOperatorInfo* pInfo = pOper->info;
×
73
  SAnomalyWindowPhysiNode*    pAnomalyNode = (SAnomalyWindowPhysiNode*)pOper->pPhyNode;
×
74
  SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
×
75
  SExprInfo*  pExprInfo = NULL;
×
76
  size_t      keyBufSize = 0;
×
77
  int32_t     num = 0;
×
78
  const char* id = GET_TASKID(pTaskInfo);
×
79

80
  pOper->status = OP_NOT_OPENED;
×
81

82
  resetBasicOperatorState(&pInfo->binfo);
×
83

84
  cleanupAggSup(&pInfo->aggSup);
×
85
  cleanupExprSupp(&pInfo->scalarSup);
×
86
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
87

88
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->anomalySup.blocks); ++i) {
×
89
    SSDataBlock* pBlock = taosArrayGetP(pInfo->anomalySup.blocks, i);
×
90
    blockDataDestroy(pBlock);
×
91
  }
92

93
  taosArrayClear(pInfo->anomalySup.blocks);
×
94
  taosArrayClear(pInfo->anomalySup.windows);
×
95
  taosMemoryFreeClear(pInfo->anomalySup.pResultRow);
×
96
  pInfo->anomalySup.groupId = 0;
×
97
  pInfo->anomalySup.cachedRows = 0;
×
98
  pInfo->anomalySup.curWin.ekey = 0;
×
99
  pInfo->anomalySup.curWin.skey = 0;
×
100
  pInfo->anomalySup.curWinIndex = 0;
×
101

102
  memset(&pInfo->anomalyWinRowSup, 0, sizeof(pInfo->anomalyWinRowSup));
×
103

104
  if (pAnomalyNode->window.pExprs != NULL) {
×
105
    int32_t    numOfScalarExpr = 0;
×
106
    SExprInfo* pScalarExprInfo = NULL;
×
107
    TAOS_CHECK_EXIT(createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr));
×
108

109
    TAOS_CHECK_EXIT(initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore));
×
110
  }
111

112
  TAOS_CHECK_EXIT(createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num));
×
113

114
  initResultSizeInfo(&pOper->resultInfo, 4096);
×
115

116
  TAOS_CHECK_EXIT(initAggSup(&pOper->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, id, pTaskInfo->streamInfo.pState,
×
117
                    &pTaskInfo->storageAPI.functionStore));
118

119
  int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes;
×
120
  pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize);
×
121
  TSDB_CHECK_NULL(pInfo->anomalySup.pResultRow, code, lino, _exit, terrno);
×
122

123
  TAOS_CHECK_EXIT(filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOper->exprSupp.pFilterInfo, 0,
×
124
                            pTaskInfo->pStreamRuntimeInfo));
125

126
  TAOS_CHECK_EXIT(initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window));
×
127

128
_exit:
×
129

130
  if (code) {
×
131
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
132
  }
133

134
  return code;  
×
135
}
136

137

138
int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
×
139
                                        SOperatorInfo** pOptrInfo) {
140
  QRY_PARAM_CHECK(pOptrInfo);
×
141
  int32_t     code = TSDB_CODE_SUCCESS;
×
142
  int32_t     lino = 0;
×
143
  size_t      keyBufSize = 0;
×
144
  int32_t     num = 0;
×
145
  SExprInfo*  pExprInfo = NULL;
×
146
  const char* id = GET_TASKID(pTaskInfo);
×
147

148
  SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo));
×
149
  SOperatorInfo*              pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
150
  SAnomalyWindowPhysiNode*    pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode;
×
151
  SColumnNode*                pColNode = (SColumnNode*)(pAnomalyNode->pAnomalyKey);
×
152
  if (pInfo == NULL || pOperator == NULL) {
×
153
    code = terrno;
×
154
    goto _error;
×
155
  }
156

157
  pOperator->pPhyNode = physiNode;
×
158
  pOperator->exprSupp.hasWindowOrGroup = true;
×
159

160
  code = initOptions(pInfo, pAnomalyNode, id);
×
161
  QUERY_CHECK_CODE(code, lino, _error);
×
162

163
  if (pAnomalyNode->window.pExprs != NULL) {
×
164
    int32_t    numOfScalarExpr = 0;
×
165
    SExprInfo* pScalarExprInfo = NULL;
×
166
    code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
×
167
    QUERY_CHECK_CODE(code, lino, _error);
×
168

169
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
×
170
    QUERY_CHECK_CODE(code, lino, _error);
×
171
  }
172

173
  code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
174
  QUERY_CHECK_CODE(code, lino, _error);
×
175

176
  tstrncpy(pInfo->anomalyOpt, pAnomalyNode->anomalyOpt, sizeof(pInfo->anomalyOpt));
×
177
  setResSlot(pInfo, pAnomalyNode, pExprInfo, num);
×
178

179
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
180

181
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, id, pTaskInfo->streamInfo.pState,
×
182
                    &pTaskInfo->storageAPI.functionStore);
183
  QUERY_CHECK_CODE(code, lino, _error);
×
184

185
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc);
×
186
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
187
  initBasicInfo(&pInfo->binfo, pResBlock);
×
188

189
  code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
×
190
  QUERY_CHECK_CODE(code, lino, _error);
×
191

192
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
×
193
  pInfo->binfo.inputTsOrder = pAnomalyNode->window.node.inputTsOrder;
×
194
  pInfo->binfo.outputTsOrder = pAnomalyNode->window.node.outputTsOrder;
×
195

196
  pInfo->anomalyCol = extractColumnFromColumnNode(pColNode);
×
197
  pInfo->anomalyKey.type = pInfo->anomalyCol.type;
×
198
  pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes;
×
199

200
  pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes);
×
201
  QUERY_CHECK_NULL(pInfo->anomalyKey.pData, code, lino, _error, terrno)
×
202

203
  int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes;
×
204
  pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize);
×
205
  QUERY_CHECK_NULL(pInfo->anomalySup.pResultRow, code, lino, _error, terrno)
×
206

207
  pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*));
×
208
  QUERY_CHECK_NULL(pInfo->anomalySup.blocks, code, lino, _error, terrno)
×
209

210
  pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow));
×
211
  QUERY_CHECK_NULL(pInfo->anomalySup.windows, code, lino, _error, terrno)
×
212

213
  pInfo->anomalySup.pMaskList = taosArrayInit(16, sizeof(int32_t));
×
214
  QUERY_CHECK_NULL(pInfo->anomalySup.pMaskList, code, lino, _error, terrno)
×
215

216
  code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
217
                            pTaskInfo->pStreamRuntimeInfo);
×
218
  QUERY_CHECK_CODE(code, lino, _error);
×
219

220
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
221
  QUERY_CHECK_CODE(code, lino, _error);
×
222

223
  setOperatorInfo(pOperator, "AnomalyWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY, true, OP_NOT_OPENED,
×
224
                  pInfo, pTaskInfo);
225
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, anomalyAggregateNext, NULL, anomalyDestroyOperatorInfo,
×
226
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
227

228
  setOperatorResetStateFn(pOperator, resetAnomalyWindowOperState);
×
229

230
  code = appendDownstream(pOperator, &downstream, 1);
×
231
  QUERY_CHECK_CODE(code, lino, _error);
×
232

233
  *pOptrInfo = pOperator;
×
234

235
  qDebug("%s anomaly_window operator is created, algo:%s url:%s opt:%s", id, pInfo->algoName, pInfo->algoUrl,
×
236
         pInfo->anomalyOpt);
237

238
  return TSDB_CODE_SUCCESS;
×
239

240
_error:
×
241
  qError("%s failed to create anomaly_window operator, line:%d algo:%s code:%s", id, lino, pAnomalyNode->anomalyOpt,
×
242
         tstrerror(code));
243

244
  if (pInfo != NULL) {
×
245
    anomalyDestroyOperatorInfo(pInfo);
×
246
  }
247

248
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
249
  pTaskInfo->code = code;
×
250
  return code;
×
251
}
252

253
int32_t initOptions(SAnomalyWindowOperatorInfo* pInfo, SAnomalyWindowPhysiNode* pAnomalyNode, const char* id) {
×
254
  SHashObj* pHashMap = NULL;
×
255
  int32_t   code = taosAnalyGetOpts(pAnomalyNode->anomalyOpt, &pHashMap);
×
256
  if (code != TSDB_CODE_SUCCESS) {
×
257
    qError("%s failed to get options for creating anomaly_window operator, code:%s", id, tstrerror(code));
×
258
    taosHashCleanup(pHashMap);
×
259
    return code;
×
260
  }
261

262
  code = taosAnalysisParseAlgo(pAnomalyNode->anomalyOpt, pInfo->algoName, pInfo->algoUrl,
×
263
                               ANALY_ALGO_TYPE_ANOMALY_DETECT, tListLen(pInfo->algoUrl), pHashMap, id);
264
  if (code != TSDB_CODE_SUCCESS) {
×
265
    qError("%s failed to parsing options for anomaly_window operator, code:%s", id, tstrerror(code));
×
266
    taosHashCleanup(pHashMap);
×
267
    return code;
×
268
  }
269

270
  pInfo->timeout = taosAnalysisParseTimout(pHashMap, id);
×
271
  pInfo->wncheck = taosAnalysisParseWncheck(pHashMap, id);
×
272

273
  taosHashCleanup(pHashMap);
×
274
  return code;
×
275
}
276

277
void setResSlot(SAnomalyWindowOperatorInfo* pInfo, SAnomalyWindowPhysiNode* pAnomalyNode, SExprInfo* pExprInfo,
×
278
                int32_t num) {
279
  pInfo->tsSlotId = ((SColumnNode*)pAnomalyNode->window.pTspk)->slotId;
×
280
  for (int32_t j = 0; j < num; ++j) {
×
281
    SExprInfo* p1 = &pExprInfo[j];
×
282
    int32_t    dstSlot = p1->base.resSchema.slotId;
×
283
    if (p1->pExpr->_function.functionType == FUNCTION_TYPE_ANOMALY_MARK) {
×
284
      pInfo->resMarkSlotId = dstSlot;
×
285
    }
286
  }
287
}
×
288

289
static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
290
  int32_t                     code = TSDB_CODE_SUCCESS;
×
291
  int32_t                     lino = 0;
×
292
  SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
×
293
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
×
294
  SOptrBasicInfo*             pBInfo = &pInfo->binfo;
×
295
  SAnomalyWindowSupp*         pSupp = &pInfo->anomalySup;
×
296
  SSDataBlock*                pRes = pInfo->binfo.pRes;
×
297
  int64_t                     st = taosGetTimestampUs();
×
298
  int32_t                     numOfBlocks = taosArrayGetSize(pSupp->blocks);
×
299
  const char*                 idstr = GET_TASKID(pTaskInfo);
×
300

301
  blockDataCleanup(pRes);
×
302

303
  while (1) {
×
304
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
305
    if (pBlock == NULL) {
×
306
      break;
×
307
    }
308

309
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
×
310
      pSupp->groupId = pBlock->info.id.groupId;
×
311
      numOfBlocks++;
×
312
      pSupp->cachedRows += pBlock->info.rows;
×
313
      qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId, numOfBlocks,
×
314
             pBlock->info.rows, pSupp->cachedRows);
315
      code = anomalyCacheBlock(pInfo, pBlock);
×
316
      QUERY_CHECK_CODE(code, lino, _end);
×
317
    } else {
318
      qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks);
×
319
      code = anomalyAggregateBlocks(pOperator);
×
320
      QUERY_CHECK_CODE(code, lino, _end);
×
321

322
      pSupp->groupId = pBlock->info.id.groupId;
×
323
      numOfBlocks = 1;
×
324
      pSupp->cachedRows = pBlock->info.rows;
×
325
      qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pSupp->groupId,
×
326
             pBlock->info.rows, pSupp->cachedRows);
327
      code = anomalyCacheBlock(pInfo, pBlock);
×
328
      QUERY_CHECK_CODE(code, lino, _end);
×
329
    }
330

331
    if (pRes->info.rows > 0) {
×
332
      (*ppRes) = pRes;
×
333
      qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pRes->info.id.groupId, numOfBlocks);
×
334
      return code;
×
335
    }
336
  }
337

338
  if (numOfBlocks > 0) {
×
339
    qDebug("group:%" PRId64 ", read finish, blocks:%d", pInfo->anomalySup.groupId, numOfBlocks);
×
340
    code = anomalyAggregateBlocks(pOperator);
×
341
  }
342

343
  int64_t cost = taosGetTimestampUs() - st;
×
344
  qDebug("%s all groups finished, cost:%" PRId64 "us", idstr, cost);
×
345

346
_end:
×
347
  if (code != TSDB_CODE_SUCCESS) {
×
348
    qError("%s %s failed at line %d since %s", idstr, __func__, lino, tstrerror(code));
×
349
    pTaskInfo->code = code;
×
350
    T_LONG_JMP(pTaskInfo->env, code);
×
351
  }
352

353
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
×
354
  return code;
×
355
}
356

357
static void anomalyDestroyOperatorInfo(void* param) {
×
358
  SAnomalyWindowOperatorInfo* pInfo = (SAnomalyWindowOperatorInfo*)param;
×
359
  if (pInfo == NULL) return;
×
360

361
  cleanupBasicInfo(&pInfo->binfo);
×
362
  cleanupAggSup(&pInfo->aggSup);
×
363
  cleanupExprSupp(&pInfo->scalarSup);
×
364
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
365

366
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->anomalySup.blocks); ++i) {
×
367
    SSDataBlock* pBlock = taosArrayGetP(pInfo->anomalySup.blocks, i);
×
368
    blockDataDestroy(pBlock);
×
369
  }
370

371
  taosArrayDestroy(pInfo->anomalySup.blocks);
×
372
  taosArrayDestroy(pInfo->anomalySup.windows);
×
373
  taosArrayDestroy(pInfo->anomalySup.pMaskList);
×
374
  taosMemoryFreeClear(pInfo->anomalySup.pResultRow);
×
375
  taosMemoryFreeClear(pInfo->anomalyKey.pData);
×
376

377
  taosMemoryFreeClear(param);
×
378
}
379

380
static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pSrc) {
×
381
  if (pInfo->anomalySup.cachedRows > ANALY_ANOMALY_WINDOW_MAX_ROWS) {
×
382
    return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
383
  }
384

385
  SSDataBlock* pDst = NULL;
×
386
  int32_t      code = createOneDataBlock(pSrc, true, &pDst);
×
387

388
  if (code != 0) return code;
×
389
  if (pDst == NULL) return code;
×
390
  if (taosArrayPush(pInfo->anomalySup.blocks, &pDst) == NULL) return terrno;
×
391

392
  return 0;
×
393
}
394

395
static int32_t anomalyFindWindow(SAnomalyWindowSupp* pSupp, TSKEY key) {
×
396
  for (int32_t i = pSupp->curWinIndex; i < taosArrayGetSize(pSupp->windows); ++i) {
×
397
    STimeWindow* pWindow = taosArrayGet(pSupp->windows, i);
×
398
    if (key >= pWindow->skey && key < pWindow->ekey) {
×
399
      pSupp->curWin = *pWindow;
×
400
      pSupp->curWinIndex = i;
×
401

402
      int32_t* p = taosArrayGet(pSupp->pMaskList, i);
×
403
      if (p != NULL) {
×
404
        pSupp->curMask = *p;
×
405
      } else {
406
        pSupp->curMask = -1; // the TDgpt may not return the mask value 
×
407
      }
408

409
      return 0;
×
410
    }
411
  }
412
  return -1;
×
413
}
414

415
static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows, SArray* pMasks, const char* pId) {
×
416
  int32_t     code = 0;
×
417
  int32_t     rows = 0;
×
418
  STimeWindow win = {0};
×
419

420
  taosArrayClear(pWindows);
×
421

422
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
×
423
  if (code < 0) {
×
424
    return TSDB_CODE_INVALID_JSON_FORMAT;
×
425
  }
426

427
  if (rows < 0) {
×
428
    char pMsg[1024] = {0};
×
429
    code = tjsonGetStringValue(pJson, "msg", pMsg);
×
430
    if (code) {
×
431
      qError("%s failed to get error msg from rsp, unknown error", pId);
×
432
    } else {
433
      qError("%s failed to exec forecast, msg:%s", pId, pMsg);
×
434
    }
435

436
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
437
  } else if (rows == 0) {
×
438
    return TSDB_CODE_SUCCESS;
×
439
  }
440

441
  SJson* res = tjsonGetObjectItem(pJson, "res");
×
442
  if (res == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
×
443

444
  SJson* pMaskObj = tjsonGetObjectItem(pJson, "mask");
×
445
  if (pMaskObj != NULL) {
×
446
    if (tjsonGetArraySize(pMaskObj) != rows) {
×
447
      qError("%s num in mask list not equals to window number", pId);
×
448
      return TSDB_CODE_INVALID_JSON_FORMAT;
×
449
    }
450
  }
451

452
  int32_t ressize = tjsonGetArraySize(res);
×
453
  if (ressize != rows) {
×
454
    qError("%s result in res not equals to window number", pId);
×
455
    return TSDB_CODE_INVALID_JSON_FORMAT;
×
456
  }
457

458
  for (int32_t i = 0; i < rows; ++i) {
×
459
    SJson* pRow = tjsonGetArrayItem(res, i);
×
460
    if (pRow == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
×
461

462
    int32_t colsize = tjsonGetArraySize(pRow);
×
463
    if (colsize != 2) return TSDB_CODE_INVALID_JSON_FORMAT;
×
464

465
    SJson* start = tjsonGetArrayItem(pRow, 0);
×
466
    SJson* end = tjsonGetArrayItem(pRow, 1);
×
467
    if (start == NULL || end == NULL) {
×
468
      qError("%s invalid res from analytic sys, code:%s", pId, tstrerror(TSDB_CODE_INVALID_JSON_FORMAT));
×
469
      return TSDB_CODE_INVALID_JSON_FORMAT;
×
470
    }
471

472
    tjsonGetObjectValueBigInt(start, &win.skey);
×
473
    tjsonGetObjectValueBigInt(end, &win.ekey);
×
474

475
    if (win.skey >= win.ekey) {
×
476
      win.ekey = win.skey + 1;
×
477
    }
478

479
    if (pMaskObj != NULL) {
×
480
      SJson* pOneMask = tjsonGetArrayItem(pMaskObj, i);
×
481
      int64_t mask = 0;
×
482
      tjsonGetObjectValueBigInt(pOneMask, &mask);
×
483

484
      int32_t m = mask;
×
485
      void* p = taosArrayPush(pMasks, &m);
×
486
      if (p == NULL) {
×
487
        qError("%s failed to put mask into result list, code:%s", pId, tstrerror(terrno));
×
488
        return terrno;
×
489
      }
490
    }
491

492
    if (taosArrayPush(pWindows, &win) == NULL) {
×
493
      qError("%s out of memory in generating anomaly_window", pId);
×
494
      return TSDB_CODE_OUT_OF_BUFFER;
×
495
    }
496
  }
497

498
  int32_t numOfWins = taosArrayGetSize(pWindows);
×
499
  qDebug("%s anomaly window received, total:%d", pId, numOfWins);
×
500
  for (int32_t i = 0; i < numOfWins; ++i) {
×
501
    STimeWindow* pWindow = taosArrayGet(pWindows, i);
×
502
    qDebug("%s anomaly win:%d [%" PRId64 ", %" PRId64 ")", pId, i, pWindow->skey, pWindow->ekey);
×
503
  }
504

505
  return code;
×
506
}
507

508
static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
×
509
  SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
×
510
  SAnomalyWindowSupp*         pSupp = &pInfo->anomalySup;
×
511
  SJson*                      pJson = NULL;
×
512
  SAnalyticBuf                analyBuf = {.bufType = ANALYTICS_BUF_TYPE_JSON};
×
513
  char                        dataBuf[64] = {0};
×
514
  int32_t                     code = 0;
×
515
  int64_t                     ts = taosGetTimestampNs();
×
516
  int32_t                     lino = 0;
×
517
  const char*                 pId = GET_TASKID(pOperator->pTaskInfo);
×
518

519
  snprintf(analyBuf.fileName, sizeof(analyBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%p-%" PRId64, tsTempDir, ts,
×
520
           pSupp, pSupp->groupId);
521
  code = tsosAnalyBufOpen(&analyBuf, 2, pId);
×
522
  QUERY_CHECK_CODE(code, lino, _OVER);
×
523

524
  const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
×
525
  if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
×
526
  if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
×
527

528
  code = taosAnalyBufWriteColMeta(&analyBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts");
×
529
  QUERY_CHECK_CODE(code, lino, _OVER);
×
530

531
  code = taosAnalyBufWriteColMeta(&analyBuf, 1, pInfo->anomalyCol.type, "val");
×
532
  QUERY_CHECK_CODE(code, lino, _OVER);
×
533

534
  code = taosAnalyBufWriteDataBegin(&analyBuf);
×
535
  QUERY_CHECK_CODE(code, lino, _OVER);
×
536

537
  int32_t numOfBlocks = (int32_t)taosArrayGetSize(pSupp->blocks);
×
538

539
  // timestamp
540
  code = taosAnalyBufWriteColBegin(&analyBuf, 0);
×
541
  QUERY_CHECK_CODE(code, lino, _OVER);
×
542

543
  for (int32_t i = 0; i < numOfBlocks; ++i) {
×
544
    SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i);
×
545
    if (pBlock == NULL) break;
×
546
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
×
547
    if (pTsCol == NULL) break;
×
548
    for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
549
      code = taosAnalyBufWriteColData(&analyBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &((TSKEY*)pTsCol->pData)[j]);
×
550
      QUERY_CHECK_CODE(code, lino, _OVER);
×
551
    }
552
  }
553

554
  code = taosAnalyBufWriteColEnd(&analyBuf, 0);
×
555
  QUERY_CHECK_CODE(code, lino, _OVER);
×
556

557
  // data
558
  code = taosAnalyBufWriteColBegin(&analyBuf, 1);
×
559
  QUERY_CHECK_CODE(code, lino, _OVER);
×
560

561
  for (int32_t i = 0; i < numOfBlocks; ++i) {
×
562
    SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i);
×
563
    if (pBlock == NULL) {
×
564
      break;
×
565
    }
566

567
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pInfo->anomalyCol.slotId);
×
568
    if (pValCol == NULL) break;
×
569

570
    for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
571
      code = taosAnalyBufWriteColData(&analyBuf, 1, pValCol->info.type, colDataGetData(pValCol, j));
×
572
      QUERY_CHECK_CODE(code, lino, _OVER);
×
573
    }
574
  }
575

576
  code = taosAnalyBufWriteColEnd(&analyBuf, 1);
×
577
  QUERY_CHECK_CODE(code, lino, _OVER);
×
578

579
  code = taosAnalyBufWriteDataEnd(&analyBuf);
×
580
  QUERY_CHECK_CODE(code, lino, _OVER);
×
581

582
  code = taosAnalyBufWriteOptStr(&analyBuf, "option", pInfo->anomalyOpt);
×
583
  QUERY_CHECK_CODE(code, lino, _OVER);
×
584

585
  code = taosAnalyBufWriteOptStr(&analyBuf, "algo", pInfo->algoName);
×
586
  QUERY_CHECK_CODE(code, lino, _OVER);
×
587

588
  code = taosAnalyBufWriteOptStr(&analyBuf, "prec", prec);
×
589
  QUERY_CHECK_CODE(code, lino, _OVER);
×
590

591
  code = taosAnalyBufWriteOptInt(&analyBuf, "wncheck", pInfo->wncheck);
×
592
  QUERY_CHECK_CODE(code, lino, _OVER);
×
593

594
  code = taosAnalyBufClose(&analyBuf);
×
595
  QUERY_CHECK_CODE(code, lino, _OVER);
×
596

597
  pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, &analyBuf, pInfo->timeout, pId);
×
598
  if (pJson == NULL) {
×
599
    code = terrno;
×
600
    goto _OVER;
×
601
  }
602

603
  code = anomalyParseJson(pJson, pSupp->windows, pSupp->pMaskList, pId);
×
604

605
_OVER:
×
606
  if (code != 0) {
×
607
    qError("%s failed to analysis window since %s, lino:%d", pId, tstrerror(code), lino);
×
608
  }
609

610
  taosAnalyBufDestroy(&analyBuf);
×
611
  if (pJson != NULL) tjsonDelete(pJson);
×
612
  return code;
×
613
}
614

615
static int32_t anomalyAggregateRows(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
616
  SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
×
617
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
×
618
  SExprSupp*                  pExprSup = &pOperator->exprSupp;
×
619
  SAnomalyWindowSupp*         pSupp = &pInfo->anomalySup;
×
620
  SWindowRowsSup*             pRowSup = &pInfo->anomalyWinRowSup;
×
621
  SResultRow*                 pResRow = pSupp->pResultRow;
×
622
  int32_t                     numOfOutput = pOperator->exprSupp.numOfExprs;
×
623

624
  int32_t code = setResultRowInitCtx(pResRow, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
×
625
  if (code == 0) {
×
626
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pSupp->curWin, 0);
×
627

628
    // NOTE:the sixth row is the mask value
629
    int64_t mask = pSupp->curMask;
×
630
    colDataSetInt64(&pInfo->twAggSup.timeWindowData, 5, &mask);
×
631
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData,
×
632
                                           pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
×
633
  }
634

635
  return code;
×
636
}
637

638
static int32_t anomalyBuildResult(SOperatorInfo* pOperator) {
×
639
  SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
×
640
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
×
641
  SExprSupp*                  pExprSup = &pOperator->exprSupp;
×
642
  SSDataBlock*                pRes = pInfo->binfo.pRes;
×
643
  SResultRow*                 pResRow = pInfo->anomalySup.pResultRow;
×
644

645
  doUpdateNumOfRows(pExprSup->pCtx, pResRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
×
646
  int32_t code = copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResRow, pExprSup->pCtx, pRes,
×
647
                                          pExprSup->rowEntryInfoOffset, pTaskInfo);
×
648
  if (code == 0) {
×
649
    pRes->info.rows += pResRow->numOfRows;
×
650
  }
651

652
  clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
×
653
  return code;
×
654
}
655

656
static int32_t anomalyAggregateBlocks(SOperatorInfo* pOperator) {
×
657
  int32_t                     code = TSDB_CODE_SUCCESS;
×
658
  int32_t                     lino = 0;
×
659
  SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
×
660
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
×
661
  SExprSupp*                  pExprSup = &pOperator->exprSupp;
×
662
  SSDataBlock*                pRes = pInfo->binfo.pRes;
×
663
  SAnomalyWindowSupp*         pSupp = &pInfo->anomalySup;
×
664
  SWindowRowsSup*             pRowSup = &pInfo->anomalyWinRowSup;
×
665
  SResultRow*                 pResRow = pSupp->pResultRow;
×
666
  int32_t                     numOfOutput = pOperator->exprSupp.numOfExprs;
×
667
  int32_t                     rowsInWin = 0;
×
668
  int32_t                     rowsInBlock = 0;
×
669
  const int64_t               gid = pSupp->groupId;
×
670
  const int32_t               order = pInfo->binfo.inputTsOrder;
×
671

672
  int32_t numOfBlocks = (int32_t)taosArrayGetSize(pSupp->blocks);
×
673
  if (numOfBlocks == 0) goto _OVER;
×
674

675
  qDebug("group:%" PRId64 ", aggregate blocks, blocks:%d", pSupp->groupId, numOfBlocks);
×
676
  pRes->info.id.groupId = pSupp->groupId;
×
677

678
  code = anomalyAnalysisWindow(pOperator);
×
679
  QUERY_CHECK_CODE(code, lino, _OVER);
×
680

681
  int32_t numOfWins = taosArrayGetSize(pSupp->windows);
×
682
  qDebug("group:%" PRId64 ", wins:%d, rows:%" PRId64, pSupp->groupId, numOfWins, pSupp->cachedRows);
×
683
  for (int32_t w = 0; w < numOfWins; ++w) {
×
684
    STimeWindow* pWindow = taosArrayGet(pSupp->windows, w);
×
685
    if (w == 0) {
×
686
      pSupp->curWin = *pWindow;
×
687
      pRowSup->win.skey = pSupp->curWin.skey;
×
688
      pSupp->curWinIndex = w;
×
689
      if (pSupp->pMaskList != NULL) {
×
690
        void*p = taosArrayGet(pSupp->pMaskList, w);
×
691
        if (p != NULL) {
×
692
          pSupp->curMask = *(int32_t*) p;
×
693
        } else {
694
          pSupp->curMask = -1;
×
695
        }
696
      }
697
    }
698
    qDebug("group:%" PRId64 ", win:%d [%" PRId64 ", %" PRId64 ")", pSupp->groupId, w, pWindow->skey, pWindow->ekey);
×
699
  }
700

701
  if (numOfWins <= 0) goto _OVER;
×
702
  if (numOfWins > pRes->info.capacity) {
×
703
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
704
    QUERY_CHECK_CODE(code, lino, _OVER);
×
705
  }
706

707
  for (int32_t b = 0; b < numOfBlocks; ++b) {
×
708
    SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, b);
×
709
    if (pBlock == NULL) break;
×
710

711
    pRes->info.scanFlag = pBlock->info.scanFlag;
×
712
    code = setInputDataBlock(pExprSup, pBlock, order, MAIN_SCAN, true);
×
713
    if (code != 0) break;
×
714

715
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
×
716
    if (code != 0) break;
×
717

718
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
719
    if (pInfo->scalarSup.pExprInfo != NULL) {
×
720
      code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
×
721
                                   pInfo->scalarSup.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
×
722
      if (code != 0) break;
×
723
    }
724

725
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pInfo->anomalyCol.slotId);
×
726
    if (pValCol == NULL) break;
×
727
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
×
728
    if (pTsCol == NULL) break;
×
729
    TSKEY* tsList = (TSKEY*)pTsCol->pData;
×
730
    bool   lastBlock = (b == numOfBlocks - 1);
×
731

732
    qTrace("group:%" PRId64 ", block:%d win:%d, riwin:%d riblock:%d, rows:%" PRId64, pSupp->groupId, b,
×
733
           pSupp->curWinIndex, rowsInWin, rowsInBlock, pBlock->info.rows);
734

735
    for (int32_t r = 0; r < pBlock->info.rows; ++r) {
×
736
      TSKEY key = tsList[r];
×
737
      bool  keyInWin = (key >= pSupp->curWin.skey && key <= pSupp->curWin.ekey);
×
738
      bool  lastRow = (r == pBlock->info.rows - 1);
×
739

740
      if (keyInWin) {
×
741
        if (r < 5) {
×
742
          qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d", pSupp->groupId, b,
×
743
                 pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
744
        }
745

746
        if (rowsInBlock == 0) {
×
747
          doKeepNewWindowStartInfo(pRowSup, tsList, r, gid);
×
748
        }
749
        doKeepTuple(pRowSup, tsList[r], r, gid);
×
750
        rowsInBlock++;
×
751
        rowsInWin++;
×
752
      } else {
753
        if (rowsInBlock > 0) {
×
754
          qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg", pSupp->groupId,
×
755
                 b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
756
          code = anomalyAggregateRows(pOperator, pBlock);
×
757
          QUERY_CHECK_CODE(code, lino, _OVER);
×
758
          rowsInBlock = 0;
×
759
        }
760
        if (rowsInWin > 0) {
×
761
          qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, build result",
×
762
                 pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
763
          code = anomalyBuildResult(pOperator);
×
764
          QUERY_CHECK_CODE(code, lino, _OVER);
×
765
          rowsInWin = 0;
×
766
        }
767
        if (anomalyFindWindow(pSupp, tsList[r]) == 0) {
×
768
          qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, new window detect",
×
769
                 pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
770
          doKeepNewWindowStartInfo(pRowSup, tsList, r, gid);
×
771
          doKeepTuple(pRowSup, tsList[r], r, gid);
×
772
          rowsInBlock = 1;
×
773
          rowsInWin = 1;
×
774
        } else {
775
          qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, window not found",
×
776
                 pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
777
          rowsInBlock = 0;
×
778
          rowsInWin = 0;
×
779
        }
780
      }
781

782
      if (lastRow && rowsInBlock > 0) {
×
783
        qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg since lastrow",
×
784
               pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
785
        code = anomalyAggregateRows(pOperator, pBlock);
×
786
        QUERY_CHECK_CODE(code, lino, _OVER);
×
787
        rowsInBlock = 0;
×
788
      }
789
    }
790

791
    if (lastBlock && rowsInWin > 0) {
×
792
      qTrace("group:%" PRId64 ", block:%d win:%d, riwin:%d riblock:%d, build result since lastblock", pSupp->groupId, b,
×
793
             pSupp->curWinIndex, rowsInWin, rowsInBlock);
794
      code = anomalyBuildResult(pOperator);
×
795
      QUERY_CHECK_CODE(code, lino, _OVER);
×
796
      rowsInWin = 0;
×
797
    }
798
  }
799

800
  code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
×
801
  QUERY_CHECK_CODE(code, lino, _OVER);
×
802

803
_OVER:
×
804
  for (int32_t i = 0; i < numOfBlocks; ++i) {
×
805
    SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i);
×
806
    qDebug("%s, clear block, pBlock:%p pBlock->pDataBlock:%p", __func__, pBlock, pBlock->pDataBlock);
×
807
    blockDataDestroy(pBlock);
×
808
  }
809

810
  taosArrayClear(pSupp->blocks);
×
811
  taosArrayClear(pSupp->windows);
×
812
  taosArrayClear(pSupp->pMaskList);
×
813

814
  pSupp->cachedRows = 0;
×
815
  pSupp->curWin.ekey = 0;
×
816
  pSupp->curWin.skey = 0;
×
817
  pSupp->curWinIndex = 0;
×
818

819
  return code;
×
820
}
821

822
#else
823

824
int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
825
                                        SOperatorInfo** pOptrInfo) {
826
  return TSDB_CODE_OPS_NOT_SUPPORT;
827
}
828
void destroyForecastInfo(void* param) {}
829

830
#endif
831

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc