• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4674

18 Aug 2025 07:58AM UTC coverage: 59.821% (+0.1%) from 59.715%
#4674

push

travis-ci

web-flow
test: update case desc (#32551)

136937 of 292075 branches covered (46.88%)

Branch coverage included in aggregate %.

207916 of 284395 relevant lines covered (73.11%)

4553289.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.18
/source/libs/executor/src/forecastoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tanalytics.h"
22
#include "taoserror.h"
23
#include "tcommon.h"
24
#include "tdatablock.h"
25
#include "tmsg.h"
26

27
#ifdef USE_ANALYTICS
28

29
#define ALGO_OPT_RETCONF_NAME      "return_conf"
30
#define ALGO_OPT_FORECASTROWS_NAME "rows"
31
#define ALGO_OPT_CONF_NAME         "conf"
32
#define ALGO_OPT_START_NAME        "start"
33
#define ALGO_OPT_EVERY_NAME        "every"
34

35
typedef struct {
36
  char*           pName;
37
  SColumnInfoData data;
38
  SColumn         col;
39
  int32_t         numOfRows;
40
} SColFutureData;
41

42
typedef struct {
43
  char         algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
44
  char         algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
45
  char*        pOptions;
46
  int64_t      maxTs;
47
  int64_t      minTs;
48
  int64_t      numOfRows;
49
  uint64_t     groupId;
50
  int64_t      forecastRows;
51
  int64_t      cachedRows;
52
  int32_t      numOfBlocks;
53
  int64_t      timeout;
54
  int16_t      resTsSlot;
55
  int16_t      resValSlot;
56
  int16_t      resLowSlot;
57
  int16_t      resHighSlot;
58
  int16_t      inputTsSlot;
59
  int16_t      targetValSlot;
60
  int8_t       targetValType;
61
  int8_t       inputPrecision;
62
  int8_t       wncheck;
63
  double       conf;
64
  int64_t      startTs;
65
  int64_t      every;
66
  int8_t       setStart;
67
  int8_t       setEvery;
68
  SArray*      pCovariateSlotList;   // covariate slot list
69
  SArray*      pDynamicRealList;     // dynamic real data list
70
  int32_t      numOfInputCols;
71
  SAnalyticBuf analyBuf;
72
} SForecastSupp;
73

74
typedef struct SForecastOperatorInfo {
75
  SSDataBlock*  pRes;
76
  SExprSupp     scalarSup;  // scalar calculation
77
  SForecastSupp forecastSupp;
78
} SForecastOperatorInfo;
79

80
static void destroyForecastInfo(void* param);
81
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id);
82

83
static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int32_t newRowsNum) {
84
  if (pBlock->info.rows < pBlock->info.capacity) {
27!
85
    return TSDB_CODE_SUCCESS;
27✔
86
  }
87

88
  int32_t code = blockDataEnsureCapacity(pBlock, newRowsNum);
×
89
  if (code != TSDB_CODE_SUCCESS) {
×
90
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
91
    return code;
×
92
  }
93

94
  return TSDB_CODE_SUCCESS;
×
95
}
96

97
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) {
27✔
98
  int32_t       code = TSDB_CODE_SUCCESS;
27✔
99
  int32_t       lino = 0;
27✔
100
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
27✔
101

102
  if (pSupp->cachedRows > ANALY_FORECAST_MAX_ROWS) {
27!
103
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
104
    qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows,
×
105
           tstrerror(code), ANALY_FORECAST_MAX_ROWS);
106
    return code;
×
107
  }
108

109
  pSupp->numOfBlocks++;
27✔
110
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
27!
111

112
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
446✔
113
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->targetValSlot);
419✔
114
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputTsSlot);
419✔
115
    if (pTsCol == NULL || pValCol == NULL) break;
419!
116

117
    int32_t index = 0;
419✔
118
    int64_t ts = ((TSKEY*)pTsCol->pData)[j];
419✔
119
    char*   val = colDataGetData(pValCol, j);
419!
120
    int16_t valType = pValCol->info.type;
419✔
121

122
    pSupp->minTs = MIN(pSupp->minTs, ts);
419✔
123
    pSupp->maxTs = MAX(pSupp->maxTs, ts);
419✔
124
    pSupp->numOfRows++;
419✔
125

126
    // write the primary time stamp column data
127
    code = taosAnalyBufWriteColData(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, &ts);
419✔
128
    if (TSDB_CODE_SUCCESS != code) {
419!
129
      qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
×
130
      return code;
×
131
    }
132

133
    // write the main column for forecasting
134
    code = taosAnalyBufWriteColData(pBuf, index++, valType, val);
419✔
135
    if (TSDB_CODE_SUCCESS != code) {
419!
136
      qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
137
      return code;
×
138
    }
139

140
    // let's handle the dynamic_real_columns
141
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pDynamicRealList); ++i) {
443✔
142
      SColFutureData*  pCol = taosArrayGet(pSupp->pDynamicRealList, i);
24✔
143
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->col.slotId);
24✔
144

145
      char* pVal = colDataGetData(pColData, j);
24!
146
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->col.type, pVal);
24✔
147
      if (TSDB_CODE_SUCCESS != code) {
24!
148
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
149
        return code;
×
150
      }
151
    }
152

153
    // now handle the past_dynamic_real columns and
154
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
479✔
155
      SColumn*         pCol = taosArrayGet(pSupp->pCovariateSlotList, i);
60✔
156
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
60✔
157

158
      char* pVal = colDataGetData(pColData, j);
60!
159
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->type, pVal);
60✔
160
      if (TSDB_CODE_SUCCESS != code) {
60!
161
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
162
        return code;
×
163
      }
164
    }
165
  }
166

167
  return 0;
27✔
168
}
169

170
static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
27✔
171
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
27✔
172
  int32_t       code = 0;
27✔
173

174
  // add the future dynamic real column data
175

176
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
88✔
177
    // the primary timestamp and the forecast column
178
    // add the future dynamic real column data
179
    if ((i >= 2) && ((i - 2) < taosArrayGetSize(pSupp->pDynamicRealList))) {
61✔
180
      SColFutureData* pCol = taosArrayGet(pSupp->pDynamicRealList, i - 2);
2✔
181
      int16_t         valType = pCol->col.type;
2✔
182
      for (int32_t j = 0; j < pCol->numOfRows; ++j) {
105✔
183
        char* pVal = colDataGetData(&pCol->data, j);
103!
184
        code = taosAnalyBufWriteColData(pBuf, i, valType, pVal);
103✔
185
        if (code != 0) {
103!
186
          return code;
×
187
        }
188
      }
189
    }
190

191
    code = taosAnalyBufWriteColEnd(pBuf, i);
61✔
192
    if (code != 0) return code;
61!
193
  }
194

195
  code = taosAnalyBufWriteDataEnd(pBuf);
27✔
196
  if (code != 0) return code;
27!
197

198
  code = taosAnalyBufWriteOptStr(pBuf, "option", pSupp->pOptions);
27✔
199
  if (code != 0) return code;
27!
200

201
  code = taosAnalyBufWriteOptStr(pBuf, "algo", pSupp->algoName);
27✔
202
  if (code != 0) return code;
27!
203

204
  const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
27✔
205
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
27!
206
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
27!
207
  code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
27✔
208
  if (code != 0) return code;
27!
209

210
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_WNCHECK_NAME, pSupp->wncheck);
27✔
211
  if (code != 0) return code;
27!
212

213
  bool noConf = (pSupp->resHighSlot == -1 && pSupp->resLowSlot == -1);
27!
214
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_RETCONF_NAME, !noConf);
27✔
215
  if (code != 0) return code;
27!
216

217
  if (pSupp->cachedRows < ANALY_FORECAST_MIN_ROWS) {
27!
218
    qError("%s history rows for forecasting not enough, min required:%d, current:%" PRId64, id, ANALY_FORECAST_MIN_ROWS,
×
219
           pSupp->forecastRows);
220
    return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
221
  }
222

223
  code = taosAnalyBufWriteOptInt(pBuf, "forecast_rows", pSupp->forecastRows);
27✔
224
  if (code != 0) return code;
27!
225

226
  code = taosAnalyBufWriteOptFloat(pBuf, "conf", pSupp->conf);
27✔
227
  if (code != 0) return code;
27!
228

229
  int32_t len = strlen(pSupp->pOptions);
27✔
230
  int64_t every = (pSupp->setEvery != 0) ? pSupp->every : ((pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows - 1));
27✔
231
  code = taosAnalyBufWriteOptInt(pBuf, "every", every);
27✔
232
  if (code != 0) return code;
27!
233

234
  int64_t start = (pSupp->setStart != 0) ? pSupp->startTs : pSupp->maxTs + every;
27✔
235
  code = taosAnalyBufWriteOptInt(pBuf, "start", start);
27✔
236
  if (code != 0) return code;
27!
237

238
  if (taosArrayGetSize(pSupp->pCovariateSlotList) + taosArrayGetSize(pSupp->pDynamicRealList) > 0) {
27✔
239
    code = taosAnalyBufWriteOptStr(pBuf, "type", "covariate");
4✔
240
    if (code != 0) return code;
4!
241
  }
242

243
  code = taosAnalyBufClose(pBuf);
27✔
244
  return code;
27✔
245
}
246

247
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
27✔
248
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
27✔
249
  int32_t       resCurRow = pBlock->info.rows;
27✔
250
  int8_t        tmpI8 = 0;
27✔
251
  int16_t       tmpI16 = 0;
27✔
252
  int32_t       tmpI32 = 0;
27✔
253
  int64_t       tmpI64 = 0;
27✔
254
  double        tmpDouble = 0;
27✔
255
  int32_t       code = 0;
27✔
256

257
  SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot);
27✔
258
  if (NULL == pResValCol) {
27!
259
    return terrno;
×
260
  }
261

262
  SColumnInfoData* pResTsCol = ((pSupp->resTsSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
27✔
263
  SColumnInfoData* pResLowCol =
27✔
264
      ((pSupp->resLowSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
27✔
265
  SColumnInfoData* pResHighCol =
27✔
266
      (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
27✔
267

268
  SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout);
27✔
269
  if (pJson == NULL) {
27!
270
    return terrno;
×
271
  }
272

273
  int32_t rows = 0;
27✔
274
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
27✔
275
  if (rows < 0 && code == 0) {
27!
276
    code = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
277

278
    char pMsg[1024] = {0};
×
279
    code = tjsonGetStringValue(pJson, "msg", pMsg);
×
280
    if (code != 0) {
×
281
      qError("%s failed to get msg from rsp, unknown error", pId);
×
282
    } else {
283
      qError("%s failed to exec forecast, msg:%s", pId, pMsg);
×
284
      void* p = strstr(pMsg, "white noise");
×
285
      if (p != NULL) {
×
286
        code = TSDB_CODE_ANA_WN_DATA;
×
287
      } else {
288
        p = strstr(pMsg, "[Errno 111] Connection refused");
×
289
        if (p != NULL) {  // the specified forecast model not loaded yet
×
290
          code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
291
        }
292
      }
293
    }
294

295
    tjsonDelete(pJson);
×
296
    return code;
×
297
  }
298

299
  // invalid json format
300
  if (code != 0) {
27!
301
    goto _OVER;
×
302
  }
303

304
  SJson* res = tjsonGetObjectItem(pJson, "res");
27✔
305
  if (res == NULL) goto _OVER;
27!
306
  int32_t ressize = tjsonGetArraySize(res);
27✔
307
  bool    returnConf = (pSupp->resHighSlot != -1 || pSupp->resLowSlot != -1);
27!
308

309
  if ((returnConf && (ressize != 4)) || ((!returnConf) && (ressize != 2))) {
27!
310
    goto _OVER;
×
311
  }
312

313
  if (pResTsCol != NULL) {
27✔
314
    resCurRow = pBlock->info.rows;
9✔
315
    SJson* tsJsonArray = tjsonGetArrayItem(res, 0);
9✔
316
    if (tsJsonArray == NULL) goto _OVER;
9!
317
    int32_t tsSize = tjsonGetArraySize(tsJsonArray);
9✔
318
    if (tsSize != rows) goto _OVER;
9!
319
    for (int32_t i = 0; i < tsSize; ++i) {
94✔
320
      SJson* tsJson = tjsonGetArrayItem(tsJsonArray, i);
85✔
321
      tjsonGetObjectValueBigInt(tsJson, &tmpI64);
85✔
322
      colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
85✔
323
      resCurRow++;
85✔
324
    }
325
  }
326

327
  if (pResLowCol != NULL) {
27✔
328
    resCurRow = pBlock->info.rows;
9✔
329
    SJson* lowJsonArray = tjsonGetArrayItem(res, 2);
9✔
330
    if (lowJsonArray == NULL) goto _OVER;
9!
331
    int32_t lowSize = tjsonGetArraySize(lowJsonArray);
9✔
332
    if (lowSize != rows) goto _OVER;
9!
333
    for (int32_t i = 0; i < lowSize; ++i) {
94✔
334
      SJson* lowJson = tjsonGetArrayItem(lowJsonArray, i);
85✔
335
      tjsonGetObjectValueDouble(lowJson, &tmpDouble);
85✔
336
      colDataSetDouble(pResLowCol, resCurRow, &tmpDouble);
85✔
337
      resCurRow++;
85✔
338
    }
339
  }
340

341
  if (pResHighCol != NULL) {
27✔
342
    resCurRow = pBlock->info.rows;
9✔
343
    SJson* highJsonArray = tjsonGetArrayItem(res, 3);
9✔
344
    if (highJsonArray == NULL) goto _OVER;
9!
345
    int32_t highSize = tjsonGetArraySize(highJsonArray);
9✔
346
    if (highSize != rows) goto _OVER;
9!
347
    for (int32_t i = 0; i < highSize; ++i) {
94✔
348
      SJson* highJson = tjsonGetArrayItem(highJsonArray, i);
85✔
349
      tjsonGetObjectValueDouble(highJson, &tmpDouble);
85✔
350
      colDataSetDouble(pResHighCol, resCurRow, &tmpDouble);
85✔
351
      resCurRow++;
85✔
352
    }
353
  }
354

355
  resCurRow = pBlock->info.rows;
27✔
356
  SJson* valJsonArray = tjsonGetArrayItem(res, 1);
27✔
357
  if (valJsonArray == NULL) goto _OVER;
27!
358
  int32_t valSize = tjsonGetArraySize(valJsonArray);
27✔
359
  if (valSize != rows) goto _OVER;
27!
360
  for (int32_t i = 0; i < valSize; ++i) {
321✔
361
    SJson* valJson = tjsonGetArrayItem(valJsonArray, i);
294✔
362
    tjsonGetObjectValueDouble(valJson, &tmpDouble);
294✔
363

364
    colDataSetDouble(pResValCol, resCurRow, &tmpDouble);
294✔
365
    resCurRow++;
294✔
366
  }
367

368
  pBlock->info.rows += rows;
27✔
369

370
  if (pJson != NULL) tjsonDelete(pJson);
27!
371
  return 0;
27✔
372

373
_OVER:
×
374
  tjsonDelete(pJson);
×
375
  if (code == 0) {
×
376
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
377
  }
378

379
  qError("%s failed to perform forecast finalize since %s", pId, tstrerror(code));
×
380
  return code;
×
381
}
382

383
static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) {
27✔
384
  int32_t       code = TSDB_CODE_SUCCESS;
27✔
385
  int32_t       lino = 0;
27✔
386
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
27✔
387

388
  code = forecastCloseBuf(pSupp, pId);
27✔
389
  QUERY_CHECK_CODE(code, lino, _end);
27!
390

391
  code = forecastEnsureBlockCapacity(pResBlock, 1);
27✔
392
  QUERY_CHECK_CODE(code, lino, _end);
27!
393

394
  code = forecastAnalysis(pSupp, pResBlock, pId);
27✔
395
  QUERY_CHECK_CODE(code, lino, _end);
27!
396

397
  uInfo("%s block:%d, forecast finalize", pId, pSupp->numOfBlocks);
27!
398

399
_end:
×
400
  pSupp->numOfBlocks = 0;
27✔
401
  taosAnalyBufDestroy(&pSupp->analyBuf);
27✔
402
  return code;
27✔
403
}
404

405
static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
54✔
406
  int32_t                code = TSDB_CODE_SUCCESS;
54✔
407
  int32_t                lino = 0;
54✔
408
  SExecTaskInfo*         pTaskInfo = pOperator->pTaskInfo;
54✔
409
  SForecastOperatorInfo* pInfo = pOperator->info;
54✔
410
  SSDataBlock*           pResBlock = pInfo->pRes;
54✔
411
  SForecastSupp*         pSupp = &pInfo->forecastSupp;
54✔
412
  SExprSupp*             pScalarSupp = &pInfo->scalarSup;
54✔
413
  SAnalyticBuf*          pBuf = &pSupp->analyBuf;
54✔
414
  int64_t                st = taosGetTimestampUs();
54✔
415
  int32_t                numOfBlocks = pSupp->numOfBlocks;
54✔
416
  const char*            pId = GET_TASKID(pOperator->pTaskInfo);
54✔
417

418
  blockDataCleanup(pResBlock);
54✔
419

420
  while (1) {
27✔
421
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
81✔
422
    if (pBlock == NULL) {
81✔
423
      break;
54✔
424
    }
425

426
    if (pScalarSupp->pExprInfo != NULL) {
27!
427
      code = projectApplyFunctions(pScalarSupp->pExprInfo, pBlock, pBlock, pScalarSupp->pCtx, pScalarSupp->numOfExprs,
×
428
                                   NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
×
429
      if (code != TSDB_CODE_SUCCESS) {
×
430
        T_LONG_JMP(pTaskInfo->env, code);
×
431
      }
432
    }
433

434
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
27!
435
      pSupp->groupId = pBlock->info.id.groupId;
27✔
436
      numOfBlocks++;
27✔
437
      pSupp->cachedRows += pBlock->info.rows;
27✔
438
      qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks,
27!
439
             pBlock->info.rows, pSupp->cachedRows);
440
      code = forecastCacheBlock(pSupp, pBlock, pId);
27✔
441
      QUERY_CHECK_CODE(code, lino, _end);
27!
442
    } else {
443
      qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks);
×
444
      code = forecastAggregateBlocks(pSupp, pResBlock, pId);
×
445
      QUERY_CHECK_CODE(code, lino, _end);
×
446
      pSupp->groupId = pBlock->info.id.groupId;
×
447
      numOfBlocks = 1;
×
448
      pSupp->cachedRows = pBlock->info.rows;
×
449
      qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId,
×
450
             pBlock->info.rows, pSupp->cachedRows);
451
      code = forecastCacheBlock(pSupp, pBlock, pId);
×
452
      QUERY_CHECK_CODE(code, lino, _end);
×
453
    }
454

455
    if (pResBlock->info.rows > 0) {
27!
456
      (*ppRes) = pResBlock;
×
457
      qDebug("%s group:%" PRId64 ", return to upstream, blocks:%d", pId, pResBlock->info.id.groupId, numOfBlocks);
×
458
      return code;
×
459
    }
460
  }
461

462
  if (numOfBlocks > 0) {
54✔
463
    qDebug("%s group:%" PRId64 ", read finish, blocks:%d", pId, pSupp->groupId, numOfBlocks);
27!
464
    code = forecastAggregateBlocks(pSupp, pResBlock, pId);
27✔
465
    QUERY_CHECK_CODE(code, lino, _end);
27!
466
  }
467

468
  int64_t cost = taosGetTimestampUs() - st;
54✔
469
  qDebug("%s all groups finished, cost:%" PRId64 "us", pId, cost);
54!
470

471
_end:
×
472
  if (code != TSDB_CODE_SUCCESS) {
54!
473
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
×
474
    pTaskInfo->code = code;
×
475
    T_LONG_JMP(pTaskInfo->env, code);
×
476
  }
477

478
  (*ppRes) = (pResBlock->info.rows == 0) ? NULL : pResBlock;
54✔
479
  return code;
54✔
480
}
481

482
static int32_t forecastParseOutput(SForecastSupp* pSupp, SExprSupp* pExprSup) {
41✔
483
  pSupp->resLowSlot = -1;
41✔
484
  pSupp->resHighSlot = -1;
41✔
485
  pSupp->resTsSlot = -1;
41✔
486
  pSupp->resValSlot = -1;
41✔
487

488
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
109✔
489
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
68✔
490
    int32_t    dstSlot = pExprInfo->base.resSchema.slotId;
68✔
491
    if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST) {
68✔
492
      pSupp->resValSlot = dstSlot;
41✔
493
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_ROWTS) {
27✔
494
      pSupp->resTsSlot = dstSlot;
9✔
495
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_LOW) {
18✔
496
      pSupp->resLowSlot = dstSlot;
9✔
497
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_HIGH) {
9!
498
      pSupp->resHighSlot = dstSlot;
9✔
499
    } else {
500
    }
501
  }
502

503
  return 0;
41✔
504
}
505

506
static int32_t validInputParams(SFunctionNode* pFunc, const char* id) {
41✔
507
  int32_t code = 0;
41✔
508
  int32_t num = LIST_LENGTH(pFunc->pParameterList);
41!
509

510
  if (num <= 1) {
41!
511
    qError("%s invalid number of parameters:%d", id, num);
×
512
    code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
513
    goto _end;
×
514
  }
515

516
  for (int32_t i = 0; i < num; ++i) {
214✔
517
    SNode* p = nodesListGetNode(pFunc->pParameterList, i);
173✔
518
    if (p == NULL) {
173!
519
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
520
      qError("%s %d-th parameter in forecast function is NULL, code:%s", id, i, tstrerror(code));
×
521
      goto _end;
×
522
    }
523
  }
524

525
  if (num == 2) {  // column_name, timestamp_column_name
41!
526
    SNode* p1 = nodesListGetNode(pFunc->pParameterList, 0);
×
527
    SNode* p2 = nodesListGetNode(pFunc->pParameterList, 1);
×
528

529
    if (nodeType(p1) != QUERY_NODE_COLUMN || nodeType(p2) != QUERY_NODE_COLUMN) {
×
530
      qError("%s invalid column type, column 1:%d, column 2:%d", id, nodeType(p1), nodeType(p2));
×
531
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
532
      goto _end;
×
533
    }
534
  } else if (num >= 3) {
41!
535
    // column_name_#1, column_name_#2...., analytics_options, timestamp_column_name, primary_key_column if exists
536
    // column_name_#1, timestamp_column_name, primary_key_column if exists
537
    // column_name_#1, analytics_options, timestamp_column_name
538
    SNode* pTarget = nodesListGetNode(pFunc->pParameterList, 0);
41✔
539
    if (nodeType(pTarget) != QUERY_NODE_COLUMN) {
41!
540
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
541
      qError("%s first parameter is not valid column in forecast function", id);
×
542
      goto _end;
×
543
    }
544

545
    SNode* pNode = nodesListGetNode(pFunc->pParameterList, num - 1);
41✔
546
    if (nodeType(pNode) != QUERY_NODE_COLUMN) {
41!
547
      qError("%s last parameter is not valid column, actual:%d", id, nodeType(pNode));
×
548
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
549
    }
550
  }
551

552
_end:
41✔
553
  if (code) {
41!
554
    qError("%s valid the parameters failed, code:%s", id, tstrerror(code));
×
555
  }
556
  return code;
41✔
557
}
558

559
static bool existInList(SForecastSupp* pSupp, int32_t slotId) {
26✔
560
  for (int32_t j = 0; j < taosArrayGetSize(pSupp->pCovariateSlotList); ++j) {
38✔
561
    SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
12✔
562

563
    if (pCol->slotId == slotId) {
12!
564
      return true;
×
565
    }
566
  }
567

568
  return false;
26✔
569
}
570

571
static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs, const char* id) {
41✔
572
  int32_t code = 0;
41✔
573
  SNode*  pNode = NULL;
41✔
574
  
575
  pSupp->inputTsSlot = -1;
41✔
576
  pSupp->targetValSlot = -1;
41✔
577
  pSupp->targetValType = -1;
41✔
578
  pSupp->inputPrecision = -1;
41✔
579

580
  FOREACH(pNode, pFuncs) {
109!
581
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
68!
582
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
68✔
583
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
68✔
584

585
      if (pFunc->funcType == FUNCTION_TYPE_FORECAST) {
68✔
586
        code = validInputParams(pFunc, id);
41✔
587
        if (code) {
41!
588
          return code;
×
589
        }
590

591
        pSupp->numOfInputCols = 2;
41✔
592

593
        if (numOfParam == 2) {
41!
594
          // column, ts
595
          SColumnNode* pTarget = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
596
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
597

598
          pSupp->inputTsSlot = pTsNode->slotId;
×
599
          pSupp->inputPrecision = pTsNode->node.resType.precision;
×
600
          pSupp->targetValSlot = pTarget->slotId;
×
601
          pSupp->targetValType = pTarget->node.resType.type;
×
602

603
          // let's add the holtwinters as the default forecast algorithm
604
          pSupp->pOptions = taosStrdup("algo=holtwinters");
×
605
          if (pSupp->pOptions == NULL) {
×
606
            qError("%s failed to dup forecast option, code:%s", id, tstrerror(terrno));
×
607
            return terrno;
×
608
          }
609
        } else {
610
          SColumnNode* pTarget = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
41✔
611
          bool         assignTs = false;
41✔
612
          bool         assignOpt = false;
41✔
613

614
          pSupp->targetValSlot = pTarget->slotId;
41✔
615
          pSupp->targetValType = pTarget->node.resType.type;
41✔
616

617
          // set the primary ts column and option info
618
          for (int32_t i = 0; i < numOfParam; ++i) {
214✔
619
            SNode* pNode = nodesListGetNode(pFunc->pParameterList, i);
173✔
620
            if (nodeType(pNode) == QUERY_NODE_COLUMN) {
173✔
621
              SColumnNode* pColNode = (SColumnNode*)pNode;
133✔
622
              if (pColNode->isPrimTs && (!assignTs)) {
133!
623
                pSupp->inputTsSlot = pColNode->slotId;
41✔
624
                pSupp->inputPrecision = pColNode->node.resType.precision;
41✔
625
                assignTs = true;
41✔
626
                continue;
41✔
627
              }
628
            } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
40!
629
              if (!assignOpt) {
40!
630
                SValueNode* pOptNode = (SValueNode*)pNode;
40✔
631
                pSupp->pOptions = taosStrdup(pOptNode->literal);
40!
632
                assignOpt = true;
40✔
633
                continue;
40✔
634
              }
635
            }
636
          }
637

638
          if (!assignOpt) {
41✔
639
            // set the default forecast option
640
            pSupp->pOptions = taosStrdup("algo=holtwinters");
1!
641
            if (pSupp->pOptions == NULL) {
1!
642
              qError("%s failed to dup forecast option, code:%s", id, tstrerror(terrno));
×
643
              return terrno;
×
644
            }
645
          }
646

647
          pSupp->pCovariateSlotList = taosArrayInit(4, sizeof(SColumn));
41✔
648
          pSupp->pDynamicRealList = taosArrayInit(4, sizeof(SColFutureData));
41✔
649

650
          // the first is the target column
651
          for (int32_t i = 1; i < numOfParam; ++i) {
74!
652
            SColumnNode* p = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, i);
74✔
653
            if ((nodeType(p) != QUERY_NODE_COLUMN) || (nodeType(p) == QUERY_NODE_COLUMN && p->isPrimTs)) {
74!
654
              break;
655
            }
656

657
            if (p->slotId == pSupp->targetValSlot) {
33✔
658
              continue; // duplicate the target column, ignore it
7✔
659
            }
660

661
            bool exist = existInList(pSupp, p->slotId);
26✔
662
            if (exist) {
26!
663
              continue;  // duplicate column, ignore it
×
664
            }
665

666
            SColumn col = {.slotId = p->slotId,
26✔
667
                           .colType = p->colType,
26✔
668
                           .type = p->node.resType.type,
26✔
669
                           .bytes = p->node.resType.bytes};
26✔
670

671
            tstrncpy(col.name, p->colName, tListLen(col.name));
26✔
672
            void* pRet = taosArrayPush(pSupp->pCovariateSlotList, &col);
26✔
673
            if (pRet == NULL) {
26!
674
              code = terrno;
×
675
              qError("failed to record the covariate slot index, since %s", tstrerror(code));
×
676
            }
677
          }
678

679
          pSupp->numOfInputCols += taosArrayGetSize(pSupp->pCovariateSlotList);
41✔
680
        }
681
      }
682
    }
683
  }
684

685
  return code;
41✔
686
}
687

688
static void initForecastOpt(SForecastSupp* pSupp) {
41✔
689
  pSupp->maxTs = 0;
41✔
690
  pSupp->minTs = INT64_MAX;
41✔
691
  pSupp->numOfRows = 0;
41✔
692
  pSupp->wncheck = ANALY_FORECAST_DEFAULT_WNCHECK;
41✔
693
  pSupp->forecastRows = ANALY_FORECAST_DEFAULT_ROWS;
41✔
694
  pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
41✔
695
  pSupp->setEvery = 0;
41✔
696
  pSupp->setStart = 0;
41✔
697
}
41✔
698

699
static int32_t filterNotSupportForecast(SForecastSupp* pSupp) {
40✔
700
  if (taosArrayGetSize(pSupp->pCovariateSlotList) > 0) {
40✔
701
    if (taosStrcasecmp(pSupp->algoName, "holtwinters") == 0) {
14✔
702
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
1✔
703
    } else if (taosStrcasecmp(pSupp->algoName, "arima") == 0) {
13✔
704
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
1✔
705
    } else if (taosStrcasecmp(pSupp->algoName, "timemoe-fc") == 0) {
12!
706
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
707
    }
708
  }
709

710
  return TSDB_CODE_SUCCESS;
38✔
711
}
712

713

714
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
41✔
715
  int32_t   code = 0;
41✔
716
  int32_t   lino = 0;
41✔
717
  SHashObj* pHashMap = NULL;
41✔
718

719
  initForecastOpt(pSupp);
41✔
720

721
  code = taosAnalyGetOpts(pSupp->pOptions, &pHashMap);
41✔
722
  if (code != TSDB_CODE_SUCCESS) {
41!
723
    return code;
×
724
  }
725

726
  if (taosHashGetSize(pHashMap) == 0) {
41!
727
    code = TSDB_CODE_INVALID_PARA;
×
728
    qError("%s no valid options for forecast, failed to exec", id);
×
729
    return code;
×
730
  }
731

732
  if (taosHashGetSize(pHashMap) == 0) {
41!
733
    code = TSDB_CODE_INVALID_PARA;
×
734
    qError("%s no valid options for forecast, failed to exec", id);
×
735
    return code;
×
736
  }
737

738
  code = taosAnalysisParseAlgo(pSupp->pOptions, pSupp->algoName, pSupp->algoUrl, ANALY_ALGO_TYPE_FORECAST,
41✔
739
                               tListLen(pSupp->algoUrl), pHashMap, id);
740
  TSDB_CHECK_CODE(code, lino, _end);
41✔
741

742
  code = filterNotSupportForecast(pSupp);
40✔
743
  if (code) {
40✔
744
    qError("%s not support forecast model, %s", id, pSupp->algoName);
2!
745
    TSDB_CHECK_CODE(code, lino, _end);
2!
746
  }
747

748
  // extract the timeout parameter
749
  pSupp->timeout = taosAnalysisParseTimout(pHashMap, id);
38✔
750
  pSupp->wncheck = taosAnalysisParseWncheck(pHashMap, id);
38✔
751

752
  // extract the forecast rows
753
  char* pRows = taosHashGet(pHashMap, ALGO_OPT_FORECASTROWS_NAME, strlen(ALGO_OPT_FORECASTROWS_NAME));
38✔
754
  if (pRows != NULL) {
38✔
755
    int64_t v = 0;
19✔
756
    code = toInteger(pRows, taosHashGetValueSize(pRows), 10, &v);
19✔
757

758
    pSupp->forecastRows = v;
19✔
759
    qDebug("%s forecast rows:%"PRId64, id, pSupp->forecastRows);
19!
760
  } else {
761
    qDebug("%s forecast rows not found:%s, use default:%" PRId64, id, pSupp->pOptions, pSupp->forecastRows);
19!
762
  }
763

764
  if (pSupp->forecastRows > ANALY_FORECAST_RES_MAX_ROWS) {
38!
765
    qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_FORECAST_RES_MAX_ROWS,
×
766
           pSupp->forecastRows);
767
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
768
    goto _end;
×
769
  }
770

771
  if (pSupp->forecastRows <= 0) {
38✔
772
    qError("%s output rows should be greater than 0, input:%" PRId64, id, pSupp->forecastRows);
3!
773
    code = TSDB_CODE_INVALID_PARA;
3✔
774
    goto _end;
3✔
775
  }
776

777
  // extract the confidence interval value
778
  char* pConf = taosHashGet(pHashMap, ALGO_OPT_CONF_NAME, strlen(ALGO_OPT_CONF_NAME));
35✔
779
  if (pConf != NULL) {
35✔
780
    char*  endPtr = NULL;
17✔
781
    double v = taosStr2Double(pConf, &endPtr);
17✔
782
    pSupp->conf = v;
17✔
783

784
    if (v <= 0 || v > 1.0) {
17!
785
      pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
×
786
      qWarn("%s valid conf range is (0, 1], user specified:%.2f out of range, set the default:%.2f", id, v,
×
787
             pSupp->conf);
788
    } else {
789
      qDebug("%s forecast conf:%.2f", id, pSupp->conf);
17!
790
    }
791
  } else {
792
    qDebug("%s forecast conf not found:%s, use default:%.2f", id, pSupp->pOptions, pSupp->conf);
18!
793
  }
794

795
  // extract the start timestamp
796
  char* pStart = taosHashGet(pHashMap, ALGO_OPT_START_NAME, strlen(ALGO_OPT_START_NAME));
35✔
797
  if (pStart != NULL) {
35✔
798
    int64_t v = 0;
2✔
799
    code = toInteger(pStart, taosHashGetValueSize(pStart), 10, &v);
2✔
800
    pSupp->startTs = v;
2✔
801
    pSupp->setStart = 1;
2✔
802
    qDebug("%s forecast set start ts:%"PRId64, id, pSupp->startTs);
2!
803
  }
804

805
  // extract the time step
806
  char* pEvery = taosHashGet(pHashMap, ALGO_OPT_EVERY_NAME, strlen(ALGO_OPT_EVERY_NAME));
35✔
807
  if (pEvery != NULL) {
35✔
808
    int64_t v = 0;
3✔
809
    code = toInteger(pEvery, taosHashGetValueSize(pEvery), 10, &v);
3✔
810
    pSupp->every = v;
3✔
811
    pSupp->setEvery = 1;
3✔
812
    qDebug("%s forecast set every ts:%"PRId64, id, pSupp->every);
3!
813
  }
814

815
  if (pSupp->setEvery && pSupp->every <= 0) {
35✔
816
    qError("%s period should be greater than 0, user specified:%"PRId64, id, pSupp->every);
1!
817
    code = TSDB_CODE_INVALID_PARA;
1✔
818
    goto _end;
1✔
819
  }
820

821
  // extract the dynamic real feature for covariate forecasting
822
  void*       pIter = NULL;
34✔
823
  size_t      keyLen = 0;
34✔
824
  const char* p = "dynamic_real_";
34✔
825

826
  while ((pIter = taosHashIterate(pHashMap, pIter))) {
112✔
827
    const char* pVal = pIter;
85✔
828
    char*       pKey = taosHashGetKey((void*)pVal, &keyLen);
85✔
829
    int32_t     idx = 0;
85✔
830
    char        nameBuf[512] = {0};
85✔
831

832
    if (strncmp(pKey, p, strlen(p)) == 0) {
85✔
833

834
      if (strncmp(&pKey[keyLen - 4], "_col", 4) == 0) {
15✔
835
        continue;
6✔
836
      }
837

838
      int32_t ret = sscanf(pKey, "dynamic_real_%d", &idx);
9✔
839
      if (ret == 0) {
9!
840
        continue;
×
841
      }
842

843
      memcpy(nameBuf, pKey, keyLen);
9✔
844
      strncpy(&nameBuf[keyLen], "_col", strlen("_col"));
9✔
845

846
      void* pCol = taosHashGet(pHashMap, nameBuf, strlen(nameBuf));
9✔
847
      if (pCol == NULL) {
9✔
848
        char* pTmp = taosStrndupi(pKey, keyLen);
5✔
849
        qError("%s dynamic real column related:%s column name:%s not specified", id, pTmp, nameBuf);
5!
850
        
851
        taosMemoryFree(pTmp);
5!
852
        code = TSDB_CODE_INVALID_PARA;
5✔
853
        goto _end;
7✔
854
      } else {
855
        // build dynamic_real_feature
856
        SColFutureData d = {.pName = taosStrndupi(pCol, taosHashGetValueSize(pCol))};
4✔
857
        if (d.pName == NULL) {
4!
858
          qError("%s failed to clone the future dynamic real column name:%s", id, (char*) pCol);
×
859
          code = terrno;
×
860
          goto _end;
2✔
861
        }
862

863
        int32_t index = -1;
4✔
864
        for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
8✔
865
          SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, i);
6✔
866
          if (strcmp(pColx->name, d.pName) == 0) {
6✔
867
            index = i;
2✔
868
            break;
2✔
869
          }
870
        }
871

872
        if (index == -1) {
4✔
873
          qError("%s not found the required future dynamic real column:%s", id, d.pName);
2!
874
          code = TSDB_CODE_INVALID_PARA;
2✔
875
          taosMemoryFree(d.pName);
2!
876
          goto _end;
2✔
877
        }
878

879
        SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, index);
2✔
880
        d.data.info.slotId = pColx->slotId;
2✔
881
        d.data.info.type = pColx->type;
2✔
882
        d.data.info.bytes = pColx->bytes;
2✔
883

884
        int32_t len = taosHashGetValueSize((void*)pVal);
2✔
885
        char*   buf = taosStrndupi(pVal, len);
2✔
886
        int32_t unused = strdequote((char*)buf);
2✔
887

888
        int32_t num = 0;
2✔
889
        char**  pList = strsplit(buf, " ", &num);
2✔
890
        if (num != pSupp->forecastRows) {
2!
891
          qError("%s the rows:%d of future dynamic real column data is not equalled to the forecasting rows:%" PRId64,
×
892
                 id, num, pSupp->forecastRows);
893
          code = TSDB_CODE_INVALID_PARA;
×
894

895
          taosMemoryFree(d.pName);
×
896
          taosMemoryFree(pList);
×
897
          taosMemoryFree(buf);
×
898
          goto _end;
×
899
        }
900

901
        d.numOfRows = num;
2✔
902

903
        code = colInfoDataEnsureCapacity(&d.data, num, true);
2✔
904
        if (code != 0) {
2!
905
          qError("%s failed to prepare buffer, code:%s", id, tstrerror(code));
×
906
          goto _end;
×
907
        }
908

909
        for (int32_t j = 0; j < num; ++j) {
105✔
910
          char* ps = NULL;
103✔
911
          if (j == 0) {
103✔
912
            ps = strstr(pList[j], "[") + 1;
2✔
913
          } else {
914
            ps = pList[j];
101✔
915
          }
916

917
          code = 0;
103✔
918

919
          switch(pColx->type) {
103!
920
            case TSDB_DATA_TYPE_TINYINT: {
×
921
              int8_t t1 = taosStr2Int8(ps, NULL, 10);
×
922
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
923
              break;
×
924
            }
925
            case TSDB_DATA_TYPE_SMALLINT: {
×
926
              int16_t t1 = taosStr2Int16(ps, NULL, 10);
×
927
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
928
              break;
×
929
            }
930
            case TSDB_DATA_TYPE_INT: {
103✔
931
              int32_t t1 = taosStr2Int32(ps, NULL, 10);
103✔
932
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
103✔
933
              break;
103✔
934
            }
935
            case TSDB_DATA_TYPE_BIGINT: {
×
936
              int64_t t1 = taosStr2Int64(ps, NULL, 10);
×
937
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
938
              break;
×
939
            }
940
            case TSDB_DATA_TYPE_FLOAT: {
×
941
              float t1 = taosStr2Float(ps, NULL);
×
942
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
943
              break;
×
944
            }
945
            case TSDB_DATA_TYPE_DOUBLE: {
×
946
              double t1 = taosStr2Double(ps, NULL);
×
947
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
948
              break;
×
949
            }
950
            case TSDB_DATA_TYPE_UTINYINT: {
×
951
              uint8_t t1 = taosStr2UInt8(ps, NULL, 10);
×
952
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
953
              break;
×
954
            }
955
            case TSDB_DATA_TYPE_USMALLINT: {
×
956
              uint16_t t1 = taosStr2UInt16(ps, NULL, 10);
×
957
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
958
              break;
×
959
            }
960
            case TSDB_DATA_TYPE_UINT: {
×
961
              uint32_t t1 = taosStr2UInt32(ps, NULL, 10);
×
962
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
963
              break;
×
964
            }
965
            case TSDB_DATA_TYPE_UBIGINT: {
×
966
              uint64_t t1 = taosStr2UInt64(ps, NULL, 10);
×
967
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
968
              break;
×
969
            }
970
          }
971

972
          if (code != 0) {
103!
973
            break;
×
974
          }
975
        }
976

977
        taosMemoryFree(pList);
2!
978
        taosMemoryFree(buf);
2!
979
        if (code != 0) {
2!
980
          goto _end;
×
981
        }
982

983
        void* noret = taosArrayPush(pSupp->pDynamicRealList, &d);
2✔
984
        if (noret == NULL) {
2!
985
          qError("%s failed to add column info in dynamic real column info", id);
×
986
          code = terrno;
×
987
          goto _end;
×
988
        }
989
      }
990
    }
991
  }
992

993
_end:
27✔
994
  taosHashCleanup(pHashMap);
41✔
995
  return code;
41✔
996
}
997

998
static int32_t forecastCreateBuf(SForecastSupp* pSupp) {
27✔
999
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
27✔
1000
  int64_t       ts = 0;  // taosGetTimestampMs();
27✔
1001
  int32_t       index = 0;
27✔
1002

1003
  pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
27✔
1004
  snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts);
27✔
1005

1006
  int32_t numOfCols = taosArrayGetSize(pSupp->pCovariateSlotList) + 2;
27✔
1007

1008
  int32_t code = tsosAnalyBufOpen(pBuf, numOfCols);
27✔
1009
  if (code != 0) goto _OVER;
27!
1010

1011
  code = taosAnalyBufWriteColMeta(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, "ts");
27✔
1012
  if (code != 0) goto _OVER;
27!
1013

1014
  code = taosAnalyBufWriteColMeta(pBuf, index++, pSupp->targetValType, "val");
27✔
1015
  if (code != 0) goto _OVER;
27!
1016

1017
  int32_t numOfDynamicReal = taosArrayGetSize(pSupp->pDynamicRealList);
27✔
1018
  int32_t numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
27✔
1019

1020
  if (numOfPastDynamicReal >= numOfDynamicReal) {
27!
1021
    for(int32_t i = 0; i < numOfDynamicReal; ++i) {
29✔
1022
      SColFutureData* pData = taosArrayGet(pSupp->pDynamicRealList, i);
2✔
1023

1024
      for(int32_t k = 0; k < taosArrayGetSize(pSupp->pCovariateSlotList); ++k) {
2!
1025
        SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, k);
2✔
1026
        if (strcmp(pCol->name, pData->pName) == 0) {
2!
1027
          char name[128] = {0};
2✔
1028
          (void) tsnprintf(name, tListLen(name), "dynamic_real_%d", i + 1);
2✔
1029
          code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
2✔
1030
          if (code != 0) {
2!
1031
            goto _OVER;
×
1032
          }
1033

1034
          memcpy(&pData->col, pCol, sizeof(SColumn));
2✔
1035
          taosArrayRemove(pSupp->pCovariateSlotList, k);
2✔
1036
          break;
2✔
1037
        }
1038
      }
1039
    }
1040

1041
    numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
27✔
1042
    for (int32_t j = 0; j < numOfPastDynamicReal; ++j) {
32✔
1043
      SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
5✔
1044

1045
      char name[128] = {0};
5✔
1046
      (void)tsnprintf(name, tListLen(name), "past_dynamic_real_%d", j + 1);
5✔
1047

1048
      code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
5✔
1049
      if (code) {
5!
1050
        goto _OVER;
×
1051
      }
1052
    }
1053
  }
1054

1055
  code = taosAnalyBufWriteDataBegin(pBuf);
27✔
1056
  if (code != 0) goto _OVER;
27!
1057

1058
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
88✔
1059
    code = taosAnalyBufWriteColBegin(pBuf, i);
61✔
1060
    if (code != 0) goto _OVER;
61!
1061
  }
1062

1063
_OVER:
27✔
1064
  if (code != 0) {
27!
1065
    (void)taosAnalyBufClose(pBuf);
×
1066
    taosAnalyBufDestroy(pBuf);
×
1067
  }
1068
  return code;
27✔
1069
}
1070

1071
static int32_t resetForecastOperState(SOperatorInfo* pOper) {
×
1072
  int32_t code = 0, lino = 0;
×
1073
  SForecastOperatorInfo* pInfo = pOper->info;
×
1074
  const char*            pId = pOper->pTaskInfo->id.str;
×
1075
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pOper->pPhyNode;
×
1076
  SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
×
1077

1078
  pOper->status = OP_NOT_OPENED;
×
1079

1080
  blockDataCleanup(pInfo->pRes);
×
1081

1082
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
×
1083
  pInfo->forecastSupp.pCovariateSlotList = NULL;
×
1084

1085
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
×
1086

1087
  cleanupExprSupp(&pOper->exprSupp);
×
1088
  cleanupExprSupp(&pInfo->scalarSup);
×
1089

1090
  int32_t                 numOfExprs = 0;
×
1091
  SExprInfo*              pExprInfo = NULL;
×
1092

1093
  TAOS_CHECK_EXIT(createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs));
×
1094

1095
  TAOS_CHECK_EXIT(initExprSupp(&pOper->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore));
×
1096

1097
  TAOS_CHECK_EXIT(filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOper->exprSupp.pFilterInfo, 0,
×
1098
                            pTaskInfo->pStreamRuntimeInfo));
1099

1100
  TAOS_CHECK_EXIT(forecastParseInput(&pInfo->forecastSupp, pForecastPhyNode->pFuncs, pId));
×
1101

1102
  TAOS_CHECK_EXIT(forecastParseOutput(&pInfo->forecastSupp, &pOper->exprSupp));
×
1103

1104
  TAOS_CHECK_EXIT(forecastParseOpt(&pInfo->forecastSupp, pId));
×
1105

1106
  TAOS_CHECK_EXIT(forecastCreateBuf(&pInfo->forecastSupp));
×
1107

1108
  if (pForecastPhyNode->pExprs != NULL) {
×
1109
    int32_t    num = 0;
×
1110
    SExprInfo* pScalarExprInfo = NULL;
×
1111
    TAOS_CHECK_EXIT(createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num));
×
1112
    TAOS_CHECK_EXIT(initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore));
×
1113
  }
1114

1115
  initResultSizeInfo(&pOper->resultInfo, 4096);
×
1116

1117
_exit:
×
1118

1119
  if (code) {
×
1120
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1121
  }
1122

1123
  return code;  
×
1124
}
1125

1126

1127

1128

1129
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
41✔
1130
                                   SOperatorInfo** pOptrInfo) {
1131
  QRY_PARAM_CHECK(pOptrInfo);
41!
1132

1133
  int32_t                code = 0;
41✔
1134
  int32_t                lino = 0;
41✔
1135
  SForecastOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SForecastOperatorInfo));
41!
1136
  SOperatorInfo*         pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
41!
1137
  if (pOperator == NULL || pInfo == NULL) {
41!
1138
    code = terrno;
×
1139
    goto _error;
×
1140
  }
1141

1142
  pOperator->pPhyNode = pPhyNode;
41✔
1143

1144
  const char*             pId = pTaskInfo->id.str;
41✔
1145
  SForecastSupp*          pSupp = &pInfo->forecastSupp;
41✔
1146
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode;
41✔
1147
  SExprSupp*              pExprSup = &pOperator->exprSupp;
41✔
1148
  int32_t                 numOfExprs = 0;
41✔
1149
  SExprInfo*              pExprInfo = NULL;
41✔
1150

1151
  code = createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
41✔
1152
  QUERY_CHECK_CODE(code, lino, _error);
41!
1153

1154
  code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
41✔
1155
  QUERY_CHECK_CODE(code, lino, _error);
41!
1156

1157
  if (pForecastPhyNode->pExprs != NULL) {
41!
1158
    int32_t    num = 0;
×
1159
    SExprInfo* pScalarExprInfo = NULL;
×
1160
    code = createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
×
1161
    QUERY_CHECK_CODE(code, lino, _error);
×
1162

1163
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
1164
    QUERY_CHECK_CODE(code, lino, _error);
×
1165
  }
1166

1167
  code = filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
41✔
1168
                            pTaskInfo->pStreamRuntimeInfo);
41✔
1169
  QUERY_CHECK_CODE(code, lino, _error);
41!
1170

1171
  code = forecastParseInput(pSupp, pForecastPhyNode->pFuncs, pId);
41✔
1172
  QUERY_CHECK_CODE(code, lino, _error);
41!
1173

1174
  code = forecastParseOutput(pSupp, pExprSup);
41✔
1175
  QUERY_CHECK_CODE(code, lino, _error);
41!
1176

1177
  code = forecastParseOpt(pSupp, pId);
41✔
1178
  QUERY_CHECK_CODE(code, lino, _error);
41✔
1179

1180
  code = forecastCreateBuf(pSupp);
27✔
1181
  QUERY_CHECK_CODE(code, lino, _error);
27!
1182

1183
  initResultSizeInfo(&pOperator->resultInfo, 4096);
27✔
1184

1185
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
27✔
1186
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
27!
1187

1188
  setOperatorInfo(pOperator, "ForecastOperator", QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, false, OP_NOT_OPENED, pInfo,
27✔
1189
                  pTaskInfo);
1190
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, forecastNext, NULL, destroyForecastInfo, optrDefaultBufFn,
27✔
1191
                                         NULL, optrDefaultGetNextExtFn, NULL);
1192

1193
  setOperatorResetStateFn(pOperator, resetForecastOperState);
27✔
1194

1195
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
27✔
1196
  QUERY_CHECK_CODE(code, lino, _error);
27!
1197

1198
  code = appendDownstream(pOperator, &downstream, 1);
27✔
1199
  QUERY_CHECK_CODE(code, lino, _error);
27!
1200

1201
  *pOptrInfo = pOperator;
27✔
1202

1203
  qDebug("%s forecast env is initialized, option:%s", pId, pSupp->pOptions);
27!
1204
  return TSDB_CODE_SUCCESS;
27✔
1205

1206
_error:
14✔
1207
  if (code != TSDB_CODE_SUCCESS) {
14!
1208
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
14!
1209
  }
1210
  if (pInfo != NULL) destroyForecastInfo(pInfo);
14!
1211
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
14✔
1212
  pTaskInfo->code = code;
14✔
1213
  return code;
14✔
1214
}
1215

1216
static void destroyColFutureData(void* p) {
2✔
1217
  SColFutureData* pData = p;
2✔
1218
  taosMemoryFree(pData->pName);
2!
1219
  colDataDestroy(&pData->data);
2✔
1220
}
2✔
1221

1222
static void destroyForecastInfo(void* param) {
41✔
1223
  SForecastOperatorInfo* pInfo = (SForecastOperatorInfo*)param;
41✔
1224

1225
  blockDataDestroy(pInfo->pRes);
41✔
1226
  pInfo->pRes = NULL;
41✔
1227

1228
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
41✔
1229
  pInfo->forecastSupp.pCovariateSlotList = NULL;
41✔
1230

1231
  taosArrayDestroyEx(pInfo->forecastSupp.pDynamicRealList, destroyColFutureData);
41✔
1232
  pInfo->forecastSupp.pDynamicRealList = NULL;
41✔
1233

1234
  taosMemoryFree(pInfo->forecastSupp.pOptions);
41!
1235

1236
  cleanupExprSupp(&pInfo->scalarSup);
41✔
1237
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
41✔
1238
  taosMemoryFreeClear(param);
41!
1239
}
41✔
1240

1241
#else
1242

1243
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1244
                                   SOperatorInfo** pOptrInfo) {
1245
  return TSDB_CODE_OPS_NOT_SUPPORT;
1246
}
1247

1248
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc