• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/forecastoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tanalytics.h"
22
#include "taoserror.h"
23
#include "tcommon.h"
24
#include "tdatablock.h"
25
#include "tmsg.h"
26

27
#ifdef USE_ANALYTICS
28

29
#define ALGO_OPT_RETCONF_NAME      "return_conf"
30
#define ALGO_OPT_FORECASTROWS_NAME "rows"
31
#define ALGO_OPT_CONF_NAME         "conf"
32
#define ALGO_OPT_START_NAME        "start"
33
#define ALGO_OPT_EVERY_NAME        "every"
34

35
typedef struct {
36
  char*           pName;
37
  SColumnInfoData data;
38
  SColumn         col;
39
  int32_t         numOfRows;
40
} SColFutureData;
41

42
typedef struct {
43
  char         algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
44
  char         algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
45
  char*        pOptions;
46
  int64_t      maxTs;
47
  int64_t      minTs;
48
  int64_t      numOfRows;
49
  uint64_t     groupId;
50
  int64_t      forecastRows;
51
  int64_t      cachedRows;
52
  int32_t      numOfBlocks;
53
  int64_t      timeout;
54
  int16_t      resTsSlot;
55
  int16_t      resValSlot;
56
  int16_t      resLowSlot;
57
  int16_t      resHighSlot;
58
  int16_t      inputTsSlot;
59
  int16_t      targetValSlot;
60
  int8_t       targetValType;
61
  int8_t       inputPrecision;
62
  int8_t       wncheck;
63
  double       conf;
64
  int64_t      startTs;
65
  int64_t      every;
66
  int8_t       setStart;
67
  int8_t       setEvery;
68
  SArray*      pCovariateSlotList;   // covariate slot list
69
  SArray*      pDynamicRealList;     // dynamic real data list
70
  int32_t      numOfInputCols;
71
  SAnalyticBuf analyBuf;
72
} SForecastSupp;
73

74
typedef struct SForecastOperatorInfo {
75
  SSDataBlock*  pRes;
76
  SExprSupp     scalarSup;  // scalar calculation
77
  SForecastSupp forecastSupp;
78
} SForecastOperatorInfo;
79

80
static void destroyForecastInfo(void* param);
81
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id);
82

83
static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int32_t newRowsNum) {
84
  if (pBlock->info.rows < pBlock->info.capacity) {
×
85
    return TSDB_CODE_SUCCESS;
×
86
  }
87

88
  int32_t code = blockDataEnsureCapacity(pBlock, newRowsNum);
×
89
  if (code != TSDB_CODE_SUCCESS) {
×
90
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
91
    return code;
×
92
  }
93

94
  return TSDB_CODE_SUCCESS;
×
95
}
96

97
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) {
×
98
  int32_t       code = TSDB_CODE_SUCCESS;
×
99
  int32_t       lino = 0;
×
100
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
101

102
  if (pSupp->cachedRows > ANALY_FORECAST_MAX_ROWS) {
×
103
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
104
    qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows,
×
105
           tstrerror(code), ANALY_FORECAST_MAX_ROWS);
106
    return code;
×
107
  }
108

109
  pSupp->numOfBlocks++;
×
110
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
×
111

112
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
113
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->targetValSlot);
×
114
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputTsSlot);
×
115
    if (pTsCol == NULL || pValCol == NULL) break;
×
116

117
    int32_t index = 0;
×
118
    int64_t ts = ((TSKEY*)pTsCol->pData)[j];
×
119
    char*   val = colDataGetData(pValCol, j);
×
120
    int16_t valType = pValCol->info.type;
×
121

122
    pSupp->minTs = MIN(pSupp->minTs, ts);
×
123
    pSupp->maxTs = MAX(pSupp->maxTs, ts);
×
124
    pSupp->numOfRows++;
×
125

126
    // write the primary time stamp column data
127
    code = taosAnalyBufWriteColData(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, &ts);
×
128
    if (TSDB_CODE_SUCCESS != code) {
×
129
      qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
×
130
      return code;
×
131
    }
132

133
    // write the main column for forecasting
134
    code = taosAnalyBufWriteColData(pBuf, index++, valType, val);
×
135
    if (TSDB_CODE_SUCCESS != code) {
×
136
      qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
137
      return code;
×
138
    }
139

140
    // let's handle the dynamic_real_columns
141
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pDynamicRealList); ++i) {
×
142
      SColFutureData*  pCol = taosArrayGet(pSupp->pDynamicRealList, i);
×
143
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->col.slotId);
×
144

145
      char* pVal = colDataGetData(pColData, j);
×
146
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->col.type, pVal);
×
147
      if (TSDB_CODE_SUCCESS != code) {
×
148
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
149
        return code;
×
150
      }
151
    }
152

153
    // now handle the past_dynamic_real columns and
154
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
×
155
      SColumn*         pCol = taosArrayGet(pSupp->pCovariateSlotList, i);
×
156
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
×
157

158
      char* pVal = colDataGetData(pColData, j);
×
159
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->type, pVal);
×
160
      if (TSDB_CODE_SUCCESS != code) {
×
161
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
162
        return code;
×
163
      }
164
    }
165
  }
166

167
  return 0;
×
168
}
169

170
static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
×
171
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
172
  int32_t       code = 0;
×
173

174
  // add the future dynamic real column data
175

176
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
×
177
    // the primary timestamp and the forecast column
178
    // add the future dynamic real column data
179
    if ((i >= 2) && ((i - 2) < taosArrayGetSize(pSupp->pDynamicRealList))) {
×
180
      SColFutureData* pCol = taosArrayGet(pSupp->pDynamicRealList, i - 2);
×
181
      int16_t         valType = pCol->col.type;
×
182
      for (int32_t j = 0; j < pCol->numOfRows; ++j) {
×
183
        char* pVal = colDataGetData(&pCol->data, j);
×
184
        taosAnalyBufWriteColData(pBuf, i, valType, pVal);
×
185
      }
186
    }
187

188
    code = taosAnalyBufWriteColEnd(pBuf, i);
×
189
    if (code != 0) return code;
×
190
  }
191

192
  code = taosAnalyBufWriteDataEnd(pBuf);
×
193
  if (code != 0) return code;
×
194

195
  code = taosAnalyBufWriteOptStr(pBuf, "option", pSupp->pOptions);
×
196
  if (code != 0) return code;
×
197

198
  code = taosAnalyBufWriteOptStr(pBuf, "algo", pSupp->algoName);
×
199
  if (code != 0) return code;
×
200

201
  const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
×
202
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
×
203
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
×
204
  code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
×
205
  if (code != 0) return code;
×
206

207
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_WNCHECK_NAME, pSupp->wncheck);
×
208
  if (code != 0) return code;
×
209

210
  bool noConf = (pSupp->resHighSlot == -1 && pSupp->resLowSlot == -1);
×
211
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_RETCONF_NAME, !noConf);
×
212
  if (code != 0) return code;
×
213

214
  if (pSupp->cachedRows < ANALY_FORECAST_MIN_ROWS) {
×
215
    qError("%s history rows for forecasting not enough, min required:%d, current:%" PRId64, id, ANALY_FORECAST_MIN_ROWS,
×
216
           pSupp->forecastRows);
217
    return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
218
  }
219

220
  code = taosAnalyBufWriteOptInt(pBuf, "forecast_rows", pSupp->forecastRows);
×
221
  if (code != 0) return code;
×
222

223
  code = taosAnalyBufWriteOptFloat(pBuf, "conf", pSupp->conf);
×
224
  if (code != 0) return code;
×
225

226
  int32_t len = strlen(pSupp->pOptions);
×
227
  int64_t every = (pSupp->setEvery != 0) ? pSupp->every : ((pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows - 1));
×
228
  code = taosAnalyBufWriteOptInt(pBuf, "every", every);
×
229
  if (code != 0) return code;
×
230

231
  int64_t start = (pSupp->setStart != 0) ? pSupp->startTs : pSupp->maxTs + every;
×
232
  code = taosAnalyBufWriteOptInt(pBuf, "start", start);
×
233
  if (code != 0) return code;
×
234

235
  if (taosArrayGetSize(pSupp->pCovariateSlotList) + taosArrayGetSize(pSupp->pDynamicRealList) > 0) {
×
236
    code = taosAnalyBufWriteOptStr(pBuf, "type", "covariate");
×
237
    if (code != 0) return code;
×
238
  }
239

240
  code = taosAnalyBufClose(pBuf);
×
241
  return code;
×
242
}
243

244
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
×
245
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
246
  int32_t       resCurRow = pBlock->info.rows;
×
247
  int8_t        tmpI8 = 0;
×
248
  int16_t       tmpI16 = 0;
×
249
  int32_t       tmpI32 = 0;
×
250
  int64_t       tmpI64 = 0;
×
251
  float         tmpFloat = 0;
×
252
  double        tmpDouble = 0;
×
253
  int32_t       code = 0;
×
254

255
  SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot);
×
256
  if (NULL == pResValCol) {
×
257
    return terrno;
×
258
  }
259

260
  SColumnInfoData* pResTsCol = ((pSupp->resTsSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
×
261
  SColumnInfoData* pResLowCol =
×
262
      ((pSupp->resLowSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
×
263
  SColumnInfoData* pResHighCol =
×
264
      (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
×
265

266
  SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout);
×
267
  if (pJson == NULL) {
×
268
    return terrno;
×
269
  }
270

271
  int32_t rows = 0;
×
272
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
×
273
  if (rows < 0 && code == 0) {
×
274
    char pMsg[1024] = {0};
×
275
    code = tjsonGetStringValue(pJson, "msg", pMsg);
×
276
    if (code != 0) {
×
277
      qError("%s failed to get msg from rsp, unknown error", pId);
×
278
    } else {
279
      qError("%s failed to exec forecast, msg:%s", pId, pMsg);
×
280
    }
281

282
    tjsonDelete(pJson);
×
283
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
284
  }
285

286
  if (code < 0) {
×
287
    goto _OVER;
×
288
  }
289

290
  SJson* res = tjsonGetObjectItem(pJson, "res");
×
291
  if (res == NULL) goto _OVER;
×
292
  int32_t ressize = tjsonGetArraySize(res);
×
293
  bool    returnConf = (pSupp->resHighSlot != -1 || pSupp->resLowSlot != -1);
×
294

295
  if ((returnConf && (ressize != 4)) || ((!returnConf) && (ressize != 2))) {
×
296
    goto _OVER;
×
297
  }
298

299
  if (pResTsCol != NULL) {
×
300
    resCurRow = pBlock->info.rows;
×
301
    SJson* tsJsonArray = tjsonGetArrayItem(res, 0);
×
302
    if (tsJsonArray == NULL) goto _OVER;
×
303
    int32_t tsSize = tjsonGetArraySize(tsJsonArray);
×
304
    if (tsSize != rows) goto _OVER;
×
305
    for (int32_t i = 0; i < tsSize; ++i) {
×
306
      SJson* tsJson = tjsonGetArrayItem(tsJsonArray, i);
×
307
      tjsonGetObjectValueBigInt(tsJson, &tmpI64);
×
308
      colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
×
309
      resCurRow++;
×
310
    }
311
  }
312

313
  if (pResLowCol != NULL) {
×
314
    resCurRow = pBlock->info.rows;
×
315
    SJson* lowJsonArray = tjsonGetArrayItem(res, 2);
×
316
    if (lowJsonArray == NULL) goto _OVER;
×
317
    int32_t lowSize = tjsonGetArraySize(lowJsonArray);
×
318
    if (lowSize != rows) goto _OVER;
×
319
    for (int32_t i = 0; i < lowSize; ++i) {
×
320
      SJson* lowJson = tjsonGetArrayItem(lowJsonArray, i);
×
321
      tjsonGetObjectValueDouble(lowJson, &tmpDouble);
×
322
      tmpFloat = (float)tmpDouble;
×
323
      colDataSetFloat(pResLowCol, resCurRow, &tmpFloat);
×
324
      resCurRow++;
×
325
    }
326
  }
327

328
  if (pResHighCol != NULL) {
×
329
    resCurRow = pBlock->info.rows;
×
330
    SJson* highJsonArray = tjsonGetArrayItem(res, 3);
×
331
    if (highJsonArray == NULL) goto _OVER;
×
332
    int32_t highSize = tjsonGetArraySize(highJsonArray);
×
333
    if (highSize != rows) goto _OVER;
×
334
    for (int32_t i = 0; i < highSize; ++i) {
×
335
      SJson* highJson = tjsonGetArrayItem(highJsonArray, i);
×
336
      tjsonGetObjectValueDouble(highJson, &tmpDouble);
×
337
      tmpFloat = (float)tmpDouble;
×
338
      colDataSetFloat(pResHighCol, resCurRow, &tmpFloat);
×
339
      resCurRow++;
×
340
    }
341
  }
342

343
  resCurRow = pBlock->info.rows;
×
344
  SJson* valJsonArray = tjsonGetArrayItem(res, 1);
×
345
  if (valJsonArray == NULL) goto _OVER;
×
346
  int32_t valSize = tjsonGetArraySize(valJsonArray);
×
347
  if (valSize != rows) goto _OVER;
×
348
  for (int32_t i = 0; i < valSize; ++i) {
×
349
    SJson* valJson = tjsonGetArrayItem(valJsonArray, i);
×
350
    tjsonGetObjectValueDouble(valJson, &tmpDouble);
×
351

352
    colDataSetDouble(pResValCol, resCurRow, &tmpDouble);
×
353
    resCurRow++;
×
354
  }
355

356
  pBlock->info.rows += rows;
×
357

358
  if (pJson != NULL) tjsonDelete(pJson);
×
359
  return 0;
×
360

361
_OVER:
×
362
  tjsonDelete(pJson);
×
363
  if (code == 0) {
×
364
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
365
  }
366

367
  qError("%s failed to perform forecast finalize since %s", pId, tstrerror(code));
×
368
  return code;
×
369
}
370

371
static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) {
×
372
  int32_t       code = TSDB_CODE_SUCCESS;
×
373
  int32_t       lino = 0;
×
374
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
375

376
  code = forecastCloseBuf(pSupp, pId);
×
377
  QUERY_CHECK_CODE(code, lino, _end);
×
378

379
  code = forecastEnsureBlockCapacity(pResBlock, 1);
×
380
  QUERY_CHECK_CODE(code, lino, _end);
×
381

382
  code = forecastAnalysis(pSupp, pResBlock, pId);
×
383
  QUERY_CHECK_CODE(code, lino, _end);
×
384

385
  uInfo("%s block:%d, forecast finalize", pId, pSupp->numOfBlocks);
×
386

387
_end:
×
388
  pSupp->numOfBlocks = 0;
×
389
  taosAnalyBufDestroy(&pSupp->analyBuf);
×
390
  return code;
×
391
}
392

393
static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
394
  int32_t                code = TSDB_CODE_SUCCESS;
×
395
  int32_t                lino = 0;
×
396
  SExecTaskInfo*         pTaskInfo = pOperator->pTaskInfo;
×
397
  SForecastOperatorInfo* pInfo = pOperator->info;
×
398
  SSDataBlock*           pResBlock = pInfo->pRes;
×
399
  SForecastSupp*         pSupp = &pInfo->forecastSupp;
×
400
  SAnalyticBuf*          pBuf = &pSupp->analyBuf;
×
401
  int64_t                st = taosGetTimestampUs();
×
402
  int32_t                numOfBlocks = pSupp->numOfBlocks;
×
403
  const char*            pId = GET_TASKID(pOperator->pTaskInfo);
×
404

405
  blockDataCleanup(pResBlock);
×
406

407
  while (1) {
×
408
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
409
    if (pBlock == NULL) {
×
410
      break;
×
411
    }
412

413
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
×
414
      pSupp->groupId = pBlock->info.id.groupId;
×
415
      numOfBlocks++;
×
416
      pSupp->cachedRows += pBlock->info.rows;
×
417
      qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks,
×
418
             pBlock->info.rows, pSupp->cachedRows);
419
      code = forecastCacheBlock(pSupp, pBlock, pId);
×
420
      QUERY_CHECK_CODE(code, lino, _end);
×
421
    } else {
422
      qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks);
×
423
      code = forecastAggregateBlocks(pSupp, pResBlock, pId);
×
424
      QUERY_CHECK_CODE(code, lino, _end);
×
425
      pSupp->groupId = pBlock->info.id.groupId;
×
426
      numOfBlocks = 1;
×
427
      pSupp->cachedRows = pBlock->info.rows;
×
428
      qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId,
×
429
             pBlock->info.rows, pSupp->cachedRows);
430
      code = forecastCacheBlock(pSupp, pBlock, pId);
×
431
      QUERY_CHECK_CODE(code, lino, _end);
×
432
    }
433

434
    if (pResBlock->info.rows > 0) {
×
435
      (*ppRes) = pResBlock;
×
436
      qDebug("%s group:%" PRId64 ", return to upstream, blocks:%d", pId, pResBlock->info.id.groupId, numOfBlocks);
×
437
      return code;
×
438
    }
439
  }
440

441
  if (numOfBlocks > 0) {
×
442
    qDebug("%s group:%" PRId64 ", read finish, blocks:%d", pId, pSupp->groupId, numOfBlocks);
×
443
    code = forecastAggregateBlocks(pSupp, pResBlock, pId);
×
444
    QUERY_CHECK_CODE(code, lino, _end);
×
445
  }
446

447
  int64_t cost = taosGetTimestampUs() - st;
×
448
  qDebug("%s all groups finished, cost:%" PRId64 "us", pId, cost);
×
449

450
_end:
×
451
  if (code != TSDB_CODE_SUCCESS) {
×
452
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
×
453
    pTaskInfo->code = code;
×
454
    T_LONG_JMP(pTaskInfo->env, code);
×
455
  }
456

457
  (*ppRes) = (pResBlock->info.rows == 0) ? NULL : pResBlock;
×
458
  return code;
×
459
}
460

461
static int32_t forecastParseOutput(SForecastSupp* pSupp, SExprSupp* pExprSup) {
×
462
  pSupp->resLowSlot = -1;
×
463
  pSupp->resHighSlot = -1;
×
464
  pSupp->resTsSlot = -1;
×
465
  pSupp->resValSlot = -1;
×
466

467
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
×
468
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
×
469
    int32_t    dstSlot = pExprInfo->base.resSchema.slotId;
×
470
    if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST) {
×
471
      pSupp->resValSlot = dstSlot;
×
472
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_ROWTS) {
×
473
      pSupp->resTsSlot = dstSlot;
×
474
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_LOW) {
×
475
      pSupp->resLowSlot = dstSlot;
×
476
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_HIGH) {
×
477
      pSupp->resHighSlot = dstSlot;
×
478
    } else {
479
    }
480
  }
481

482
  return 0;
×
483
}
484

485
static int32_t validInputParams(SFunctionNode* pFunc, const char* id) {
×
486
  int32_t code = 0;
×
487
  int32_t num = LIST_LENGTH(pFunc->pParameterList);
×
488

489
  if (num <= 1) {
×
490
    qError("%s invalid number of parameters:%d", id, num);
×
491
    code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
492
    goto _end;
×
493
  }
494

495
  for (int32_t i = 0; i < num; ++i) {
×
496
    SNode* p = nodesListGetNode(pFunc->pParameterList, i);
×
497
    if (p == NULL) {
×
498
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
499
      qError("%s %d-th parameter in forecast function is NULL, code:%s", id, i, tstrerror(code));
×
500
      goto _end;
×
501
    }
502
  }
503

504
  if (num == 2) {  // column_name, timestamp_column_name
×
505
    SNode* p1 = nodesListGetNode(pFunc->pParameterList, 0);
×
506
    SNode* p2 = nodesListGetNode(pFunc->pParameterList, 1);
×
507

508
    if (nodeType(p1) != QUERY_NODE_COLUMN || nodeType(p2) != QUERY_NODE_COLUMN) {
×
509
      qError("%s invalid column type, column 1:%d, column 2:%d", id, nodeType(p1), nodeType(p2));
×
510
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
511
      goto _end;
×
512
    }
513
  } else if (num >= 3) {
×
514
    // column_name_#1, column_name_#2...., analytics_options, timestamp_column_name, primary_key_column if exists
515
    // column_name_#1, timestamp_column_name, primary_key_column if exists
516
    // column_name_#1, analytics_options, timestamp_column_name
517
    SNode* pTarget = nodesListGetNode(pFunc->pParameterList, 0);
×
518
    if (nodeType(pTarget) != QUERY_NODE_COLUMN) {
×
519
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
520
      qError("%s first parameter is not valid column in forecast function", id);
×
521
      goto _end;
×
522
    }
523

524
    SNode* pNode = nodesListGetNode(pFunc->pParameterList, num - 1);
×
525
    if (nodeType(pNode) != QUERY_NODE_COLUMN) {
×
526
      qError("%s last parameter is not valid column, actual:%d", id, nodeType(pNode));
×
527
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
528
    }
529
  }
530

531
_end:
×
532
  if (code) {
×
533
    qError("%s valid the parameters failed, code:%s", id, tstrerror(code));
×
534
  }
535
  return code;
×
536
}
537

538
static bool existInList(SForecastSupp* pSupp, int32_t slotId) {
×
539
  for (int32_t j = 0; j < taosArrayGetSize(pSupp->pCovariateSlotList); ++j) {
×
540
    SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
×
541

542
    if (pCol->slotId == slotId) {
×
543
      return true;
×
544
    }
545
  }
546

547
  return false;
×
548
}
549

550
static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
551
  int32_t code = 0;
×
552
  SNode*  pNode = NULL;
×
553
  
554
  pSupp->inputTsSlot = -1;
×
555
  pSupp->targetValSlot = -1;
×
556
  pSupp->targetValType = -1;
×
557
  pSupp->inputPrecision = -1;
×
558

559
  FOREACH(pNode, pFuncs) {
×
560
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
561
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
562
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
563

564
      if (pFunc->funcType == FUNCTION_TYPE_FORECAST) {
×
565
        code = validInputParams(pFunc, id);
×
566
        if (code) {
×
567
          return code;
×
568
        }
569

570
        pSupp->numOfInputCols = 2;
×
571

572
        if (numOfParam == 2) {
×
573
          // column, ts
574
          SColumnNode* pTarget = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
575
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
576

577
          pSupp->inputTsSlot = pTsNode->slotId;
×
578
          pSupp->inputPrecision = pTsNode->node.resType.precision;
×
579
          pSupp->targetValSlot = pTarget->slotId;
×
580
          pSupp->targetValType = pTarget->node.resType.type;
×
581

582
          // let's add the holtwinters as the default forecast algorithm
583
          pSupp->pOptions = taosStrdup("algo=holtwinters");
×
584
          if (pSupp->pOptions == NULL) {
×
585
            qError("%s failed to dup forecast option, code:%s", id, tstrerror(terrno));
×
586
            return terrno;
×
587
          }
588
        } else {
589
          SColumnNode* pTarget = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
590
          bool         assignTs = false;
×
591
          bool         assignOpt = false;
×
592

593
          pSupp->targetValSlot = pTarget->slotId;
×
594
          pSupp->targetValType = pTarget->node.resType.type;
×
595

596
          // set the primary ts column and option info
597
          for (int32_t i = 0; i < numOfParam; ++i) {
×
598
            SNode* pNode = nodesListGetNode(pFunc->pParameterList, i);
×
599
            if (nodeType(pNode) == QUERY_NODE_COLUMN) {
×
600
              SColumnNode* pColNode = (SColumnNode*)pNode;
×
601
              if (pColNode->isPrimTs && (!assignTs)) {
×
602
                pSupp->inputTsSlot = pColNode->slotId;
×
603
                pSupp->inputPrecision = pColNode->node.resType.precision;
×
604
                assignTs = true;
×
605
                continue;
×
606
              }
607
            } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
608
              if (!assignOpt) {
×
609
                SValueNode* pOptNode = (SValueNode*)pNode;
×
610
                pSupp->pOptions = taosStrdup(pOptNode->literal);
×
611
                assignOpt = true;
×
612
                continue;
×
613
              }
614
            }
615
          }
616

617
          if (!assignOpt) {
×
618
            // set the default forecast option
619
            pSupp->pOptions = taosStrdup("algo=holtwinters");
×
620
            if (pSupp->pOptions == NULL) {
×
621
              qError("%s failed to dup forecast option, code:%s", id, tstrerror(terrno));
×
622
              return terrno;
×
623
            }
624
          }
625

626
          pSupp->pCovariateSlotList = taosArrayInit(4, sizeof(SColumn));
×
627
          pSupp->pDynamicRealList = taosArrayInit(4, sizeof(SColFutureData));
×
628

629
          // the first is the target column
630
          for (int32_t i = 1; i < numOfParam; ++i) {
×
631
            SColumnNode* p = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, i);
×
632
            if ((nodeType(p) != QUERY_NODE_COLUMN) || (nodeType(p) == QUERY_NODE_COLUMN && p->isPrimTs)) {
×
633
              break;
634
            }
635

636
            if (p->slotId == pSupp->targetValSlot) {
×
637
              continue; // duplicate the target column, ignore it
×
638
            }
639

640
            bool exist = existInList(pSupp, p->slotId);
×
641
            if (exist) {
×
642
              continue;  // duplicate column, ignore it
×
643
            }
644

645
            SColumn col = {.slotId = p->slotId,
×
646
                           .colType = p->colType,
×
647
                           .type = p->node.resType.type,
×
648
                           .bytes = p->node.resType.bytes};
×
649

650
            tstrncpy(col.name, p->colName, tListLen(col.name));
×
651
            void* pRet = taosArrayPush(pSupp->pCovariateSlotList, &col);
×
652
            if (pRet == NULL) {
×
653
              code = terrno;
×
654
              qError("failed to record the covariate slot index, since %s", tstrerror(code));
×
655
            }
656
          }
657

658
          pSupp->numOfInputCols += taosArrayGetSize(pSupp->pCovariateSlotList);
×
659
        }
660
      }
661
    }
662
  }
663

664
  return code;
×
665
}
666

667
static void initForecastOpt(SForecastSupp* pSupp) {
×
668
  pSupp->maxTs = 0;
×
669
  pSupp->minTs = INT64_MAX;
×
670
  pSupp->numOfRows = 0;
×
671
  pSupp->wncheck = ANALY_FORECAST_DEFAULT_WNCHECK;
×
672
  pSupp->forecastRows = ANALY_FORECAST_DEFAULT_ROWS;
×
673
  pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
×
674
  pSupp->setEvery = 0;
×
675
  pSupp->setStart = 0;
×
676
}
×
677

678
static int32_t filterNotSupportForecast(SForecastSupp* pSupp) {
×
679
  if (taosArrayGetSize(pSupp->pCovariateSlotList) > 0) {
×
680
    if (taosStrcasecmp(pSupp->algoName, "holtwinters") == 0) {
×
681
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
682
    } else if (taosStrcasecmp(pSupp->algoName, "arima") == 0) {
×
683
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
684
    } else if (taosStrcasecmp(pSupp->algoName, "timemoe-fc") == 0) {
×
685
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
686
    }
687
  }
688

689
  return TSDB_CODE_SUCCESS;
×
690
}
691

692

693
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
×
694
  int32_t   code = 0;
×
695
  int32_t   lino = 0;
×
696
  SHashObj* pHashMap = NULL;
×
697

698
  initForecastOpt(pSupp);
×
699

700
  code = taosAnalyGetOpts(pSupp->pOptions, &pHashMap);
×
701
  if (code != TSDB_CODE_SUCCESS) {
×
702
    return code;
×
703
  }
704

705
  if (taosHashGetSize(pHashMap) == 0) {
×
706
    code = TSDB_CODE_INVALID_PARA;
×
707
    qError("%s no valid options for forecast, failed to exec", id);
×
708
    return code;
×
709
  }
710

711
  if (taosHashGetSize(pHashMap) == 0) {
×
712
    code = TSDB_CODE_INVALID_PARA;
×
713
    qError("%s no valid options for forecast, failed to exec", id);
×
714
    return code;
×
715
  }
716

717
  code = taosAnalysisParseAlgo(pSupp->pOptions, pSupp->algoName, pSupp->algoUrl, ANALY_ALGO_TYPE_FORECAST,
×
718
                               tListLen(pSupp->algoUrl), pHashMap, id);
719
  TSDB_CHECK_CODE(code, lino, _end);
×
720

721
  code = filterNotSupportForecast(pSupp);
×
722
  if (code) {
×
723
    qError("%s not support forecast model, %s", id, pSupp->algoName);
×
724
    TSDB_CHECK_CODE(code, lino, _end);
×
725
  }
726

727
  // extract the timeout parameter
728
  pSupp->timeout = taosAnalysisParseTimout(pHashMap, id);
×
729
  pSupp->wncheck = taosAnalysisParseWncheck(pHashMap, id);
×
730

731
  // extract the forecast rows
732
  char* pRows = taosHashGet(pHashMap, ALGO_OPT_FORECASTROWS_NAME, strlen(ALGO_OPT_FORECASTROWS_NAME));
×
733
  if (pRows != NULL) {
×
734
    int64_t v = 0;
×
735
    code = toInteger(pRows, taosHashGetValueSize(pRows), 10, &v);
×
736

737
    pSupp->forecastRows = v;
×
738
    qDebug("%s forecast rows:%"PRId64, id, pSupp->forecastRows);
×
739
  } else {
740
    qDebug("%s forecast rows not found:%s, use default:%" PRId64, id, pSupp->pOptions, pSupp->forecastRows);
×
741
  }
742

743
  if (pSupp->forecastRows > ANALY_FORECAST_RES_MAX_ROWS) {
×
744
    qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_FORECAST_RES_MAX_ROWS,
×
745
           pSupp->forecastRows);
746
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
747
    goto _end;
×
748
  }
749

750
  // extract the confidence interval value
751
  char* pConf = taosHashGet(pHashMap, ALGO_OPT_CONF_NAME, strlen(ALGO_OPT_CONF_NAME));
×
752
  if (pConf != NULL) {
×
753
    char*  endPtr = NULL;
×
754
    double v = taosStr2Double(pConf, &endPtr);
×
755
    pSupp->conf = v;
×
756

757
    if (v <= 0 || v > 1.0) {
×
758
      pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
×
759
      qWarn("%s valid conf range is (0, 1], user specified:%.2f out of range, set the default:%.2f", id, v,
×
760
             pSupp->conf);
761
    } else {
762
      qDebug("%s forecast conf:%.2f", id, pSupp->conf);
×
763
    }
764
  } else {
765
    qDebug("%s forecast conf not found:%s, use default:%.2f", id, pSupp->pOptions, pSupp->conf);
×
766
  }
767

768
  // extract the start timestamp
769
  char* pStart = taosHashGet(pHashMap, ALGO_OPT_START_NAME, strlen(ALGO_OPT_START_NAME));
×
770
  if (pStart != NULL) {
×
771
    int64_t v = 0;
×
772
    code = toInteger(pStart, taosHashGetValueSize(pStart), 10, &v);
×
773
    pSupp->startTs = v;
×
774
    pSupp->setStart = 1;
×
775
    qDebug("%s forecast set start ts:%"PRId64, id, pSupp->startTs);
×
776
  }
777

778
  // extract the time step
779
  char* pEvery = taosHashGet(pHashMap, ALGO_OPT_EVERY_NAME, strlen(ALGO_OPT_EVERY_NAME));
×
780
  if (pEvery != NULL) {
×
781
    int64_t v = 0;
×
782
    code = toInteger(pEvery, taosHashGetValueSize(pEvery), 10, &v);
×
783
    pSupp->every = v;
×
784
    pSupp->setEvery = 1;
×
785
    qDebug("%s forecast set every ts:%"PRId64, id, pSupp->every);
×
786
  }
787

788
  // extract the dynamic real feature for covariate forecasting
789
  void*       pIter = NULL;
×
790
  size_t      keyLen = 0;
×
791
  const char* p = "dynamic_real_";
×
792

793
  while ((pIter = taosHashIterate(pHashMap, pIter))) {
×
794
    const char* pVal = pIter;
×
795
    char*       pKey = taosHashGetKey((void*)pVal, &keyLen);
×
796
    int32_t     idx = 0;
×
797
    char        nameBuf[512] = {0};
×
798

799
    if (strncmp(pKey, p, strlen(p)) == 0) {
×
800

801
      if (strncmp(&pKey[keyLen - 4], "_col", 4) == 0) {
×
802
        continue;
×
803
      }
804

805
      int32_t ret = sscanf(pKey, "dynamic_real_%d", &idx);
×
806
      if (ret == 0) {
×
807
        continue;
×
808
      }
809

810
      memcpy(nameBuf, pKey, keyLen);
×
811
      strncpy(&nameBuf[keyLen], "_col", strlen("_col"));
×
812

813
      void* pCol = taosHashGet(pHashMap, nameBuf, strlen(nameBuf));
×
814
      if (pCol == NULL) {
×
815
        char* pTmp = taosStrndupi(pKey, keyLen);
×
816
        qError("%s dynamic real column related:%s column name:%s not specified", id, pTmp, nameBuf);
×
817
        
818
        taosMemoryFree(pTmp);
×
819
        code = TSDB_CODE_INVALID_PARA;
×
820
        goto _end;
×
821
      } else {
822
        // build dynamic_real_feature
823
        SColFutureData d = {.pName = taosStrndupi(pCol, taosHashGetValueSize(pCol))};
×
824
        if (d.pName == NULL) {
×
825
          qError("%s failed to clone the future dynamic real column name:%s", id, (char*) pCol);
×
826
          code = terrno;
×
827
          goto _end;
×
828
        }
829

830
        int32_t index = -1;
×
831
        for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
×
832
          SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, i);
×
833
          if (strcmp(pColx->name, d.pName) == 0) {
×
834
            index = i;
×
835
            break;
×
836
          }
837
        }
838

839
        if (index == -1) {
×
840
          qError("%s not found the required future dynamic real column:%s", id, d.pName);
×
841
          code = TSDB_CODE_INVALID_PARA;
×
842
          taosMemoryFree(d.pName);
×
843
          goto _end;
×
844
        }
845

846
        SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, index);
×
847
        d.data.info.slotId = pColx->slotId;
×
848
        d.data.info.type = pColx->type;
×
849
        d.data.info.bytes = pColx->bytes;
×
850

851
        int32_t len = taosHashGetValueSize((void*)pVal);
×
852
        char*   buf = taosStrndupi(pVal, len);
×
853
        int32_t unused = strdequote((char*)buf);
×
854

855
        int32_t num = 0;
×
856
        char**  pList = strsplit(buf, " ", &num);
×
857
        if (num != pSupp->forecastRows) {
×
858
          qError("%s the rows:%d of future dynamic real column data is not equalled to the forecasting rows:%" PRId64,
×
859
                 id, num, pSupp->forecastRows);
860
          code = TSDB_CODE_INVALID_PARA;
×
861

862
          taosMemoryFree(d.pName);
×
863
          taosMemoryFree(pList);
×
864
          taosMemoryFree(buf);
×
865
          goto _end;
×
866
        }
867

868
        d.numOfRows = num;
×
869

870
        colInfoDataEnsureCapacity(&d.data, num, true);
×
871
        for (int32_t j = 0; j < num; ++j) {
×
872
          char* ps = NULL;
×
873
          if (j == 0) {
×
874
            ps = strstr(pList[j], "[") + 1;
×
875
          } else {
876
            ps = pList[j];
×
877
          }
878

879
          switch(pColx->type) {
×
880
            case TSDB_DATA_TYPE_TINYINT: {
×
881
              int8_t t1 = taosStr2Int8(ps, NULL, 10);
×
882
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
883
              break;
×
884
            }
885
            case TSDB_DATA_TYPE_SMALLINT: {
×
886
              int16_t t1 = taosStr2Int16(ps, NULL, 10);
×
887
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
888
              break;
×
889
            }
890
            case TSDB_DATA_TYPE_INT: {
×
891
              int32_t t1 = taosStr2Int32(ps, NULL, 10);
×
892
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
893
              break;
×
894
            }
895
            case TSDB_DATA_TYPE_BIGINT: {
×
896
              int64_t t1 = taosStr2Int64(ps, NULL, 10);
×
897
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
898
              break;
×
899
            }
900
            case TSDB_DATA_TYPE_FLOAT: {
×
901
              float t1 = taosStr2Float(ps, NULL);
×
902
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
903
              break;
×
904
            }
905
            case TSDB_DATA_TYPE_DOUBLE: {
×
906
              double t1 = taosStr2Double(ps, NULL);
×
907
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
908
              break;
×
909
            }
910
            case TSDB_DATA_TYPE_UTINYINT: {
×
911
              uint8_t t1 = taosStr2UInt8(ps, NULL, 10);
×
912
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
913
              break;
×
914
            }
915
            case TSDB_DATA_TYPE_USMALLINT: {
×
916
              uint16_t t1 = taosStr2UInt16(ps, NULL, 10);
×
917
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
918
              break;
×
919
            }
920
            case TSDB_DATA_TYPE_UINT: {
×
921
              uint32_t t1 = taosStr2UInt32(ps, NULL, 10);
×
922
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
923
              break;
×
924
            }
925
            case TSDB_DATA_TYPE_UBIGINT: {
×
926
              uint64_t t1 = taosStr2UInt64(ps, NULL, 10);
×
927
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
928
              break;
×
929
            }
930
          }
931

932
        }
933

934
        taosMemoryFree(pList);
×
935
        taosMemoryFree(buf);
×
936

937
        void* noret = taosArrayPush(pSupp->pDynamicRealList, &d);
×
938
        if (noret == NULL) {
×
939
          qError("%s failed to add column info in dynamic real column info", id);
×
940
          code = terrno;
×
941
          goto _end;
×
942
        }
943
      }
944
    }
945
  }
946

947
_end:
×
948
  taosHashCleanup(pHashMap);
×
949
  return code;
×
950
}
951

952
static int32_t forecastCreateBuf(SForecastSupp* pSupp) {
×
953
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
954
  int64_t       ts = 0;  // taosGetTimestampMs();
×
955
  int32_t       index = 0;
×
956

957
  pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
×
958
  snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts);
×
959

960
  int32_t numOfCols = taosArrayGetSize(pSupp->pCovariateSlotList) + 2;
×
961

962
  int32_t code = tsosAnalyBufOpen(pBuf, numOfCols);
×
963
  if (code != 0) goto _OVER;
×
964

965
  code = taosAnalyBufWriteColMeta(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, "ts");
×
966
  if (code != 0) goto _OVER;
×
967

968
  code = taosAnalyBufWriteColMeta(pBuf, index++, pSupp->targetValType, "val");
×
969
  if (code != 0) goto _OVER;
×
970

971
  int32_t numOfDynamicReal = taosArrayGetSize(pSupp->pDynamicRealList);
×
972
  int32_t numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
×
973

974
  if (numOfPastDynamicReal >= numOfDynamicReal) {
×
975
    for(int32_t i = 0; i < numOfDynamicReal; ++i) {
×
976
      SColFutureData* pData = taosArrayGet(pSupp->pDynamicRealList, i);
×
977

978
      for(int32_t k = 0; k < taosArrayGetSize(pSupp->pCovariateSlotList); ++k) {
×
979
        SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, k);
×
980
        if (strcmp(pCol->name, pData->pName) == 0) {
×
981
          char name[128] = {0};
×
982
          (void) tsnprintf(name, tListLen(name), "dynamic_real_%d", i + 1);
×
983
          code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
×
984
          if (code != 0) {
×
985
            goto _OVER;
×
986
          }
987

988
          memcpy(&pData->col, pCol, sizeof(SColumn));
×
989
          taosArrayRemove(pSupp->pCovariateSlotList, k);
×
990
          break;
×
991
        }
992
      }
993
    }
994

995
    numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
×
996
    for (int32_t j = 0; j < numOfPastDynamicReal; ++j) {
×
997
      SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
×
998

999
      char name[128] = {0};
×
1000
      (void)tsnprintf(name, tListLen(name), "past_dynamic_real_%d", j + 1);
×
1001

1002
      code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
×
1003
      if (code) {
×
1004
        goto _OVER;
×
1005
      }
1006
    }
1007
  }
1008

1009
  code = taosAnalyBufWriteDataBegin(pBuf);
×
1010
  if (code != 0) goto _OVER;
×
1011

1012
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
×
1013
    code = taosAnalyBufWriteColBegin(pBuf, i);
×
1014
    if (code != 0) goto _OVER;
×
1015
  }
1016

1017
_OVER:
×
1018
  if (code != 0) {
×
1019
    (void)taosAnalyBufClose(pBuf);
×
1020
    taosAnalyBufDestroy(pBuf);
×
1021
  }
1022
  return code;
×
1023
}
1024

1025
static int32_t resetForecastOperState(SOperatorInfo* pOper) {
×
1026
  int32_t code = 0, lino = 0;
×
1027
  SForecastOperatorInfo* pInfo = pOper->info;
×
1028
  const char*            pId = pOper->pTaskInfo->id.str;
×
1029
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pOper->pPhyNode;
×
1030
  SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
×
1031

1032
  pOper->status = OP_NOT_OPENED;
×
1033

1034
  blockDataCleanup(pInfo->pRes);
×
1035

1036
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
×
1037
  pInfo->forecastSupp.pCovariateSlotList = NULL;
×
1038

1039
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
×
1040

1041
  cleanupExprSupp(&pOper->exprSupp);
×
1042
  cleanupExprSupp(&pInfo->scalarSup);
×
1043

1044
  int32_t                 numOfExprs = 0;
×
1045
  SExprInfo*              pExprInfo = NULL;
×
1046

1047
  TAOS_CHECK_EXIT(createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs));
×
1048

1049
  TAOS_CHECK_EXIT(initExprSupp(&pOper->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore));
×
1050

1051
  TAOS_CHECK_EXIT(filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOper->exprSupp.pFilterInfo, 0,
×
1052
                            pTaskInfo->pStreamRuntimeInfo));
1053

1054
  TAOS_CHECK_EXIT(forecastParseInput(&pInfo->forecastSupp, pForecastPhyNode->pFuncs, pId));
×
1055

1056
  TAOS_CHECK_EXIT(forecastParseOutput(&pInfo->forecastSupp, &pOper->exprSupp));
×
1057

1058
  TAOS_CHECK_EXIT(forecastParseOpt(&pInfo->forecastSupp, pId));
×
1059

1060
  TAOS_CHECK_EXIT(forecastCreateBuf(&pInfo->forecastSupp));
×
1061

1062
  if (pForecastPhyNode->pExprs != NULL) {
×
1063
    int32_t    num = 0;
×
1064
    SExprInfo* pScalarExprInfo = NULL;
×
1065
    TAOS_CHECK_EXIT(createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num));
×
1066
    TAOS_CHECK_EXIT(initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore));
×
1067
  }
1068

1069
  initResultSizeInfo(&pOper->resultInfo, 4096);
×
1070

1071
_exit:
×
1072

1073
  if (code) {
×
1074
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1075
  }
1076

1077
  return code;  
×
1078
}
1079

1080

1081

1082

1083
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
1084
                                   SOperatorInfo** pOptrInfo) {
1085
  QRY_PARAM_CHECK(pOptrInfo);
×
1086

1087
  int32_t                code = 0;
×
1088
  int32_t                lino = 0;
×
1089
  SForecastOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SForecastOperatorInfo));
×
1090
  SOperatorInfo*         pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
1091
  if (pOperator == NULL || pInfo == NULL) {
×
1092
    code = terrno;
×
1093
    goto _error;
×
1094
  }
1095

1096
  pOperator->pPhyNode = pPhyNode;
×
1097

1098
  const char*             pId = pTaskInfo->id.str;
×
1099
  SForecastSupp*          pSupp = &pInfo->forecastSupp;
×
1100
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode;
×
1101
  SExprSupp*              pExprSup = &pOperator->exprSupp;
×
1102
  int32_t                 numOfExprs = 0;
×
1103
  SExprInfo*              pExprInfo = NULL;
×
1104

1105
  code = createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
×
1106
  QUERY_CHECK_CODE(code, lino, _error);
×
1107

1108
  code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
×
1109
  QUERY_CHECK_CODE(code, lino, _error);
×
1110

1111
  if (pForecastPhyNode->pExprs != NULL) {
×
1112
    int32_t    num = 0;
×
1113
    SExprInfo* pScalarExprInfo = NULL;
×
1114
    code = createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
×
1115
    QUERY_CHECK_CODE(code, lino, _error);
×
1116

1117
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
1118
    QUERY_CHECK_CODE(code, lino, _error);
×
1119
  }
1120

1121
  code = filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
1122
                            pTaskInfo->pStreamRuntimeInfo);
×
1123
  QUERY_CHECK_CODE(code, lino, _error);
×
1124

1125
  code = forecastParseInput(pSupp, pForecastPhyNode->pFuncs, pId);
×
1126
  QUERY_CHECK_CODE(code, lino, _error);
×
1127

1128
  code = forecastParseOutput(pSupp, pExprSup);
×
1129
  QUERY_CHECK_CODE(code, lino, _error);
×
1130

1131
  code = forecastParseOpt(pSupp, pId);
×
1132
  QUERY_CHECK_CODE(code, lino, _error);
×
1133

1134
  code = forecastCreateBuf(pSupp);
×
1135
  QUERY_CHECK_CODE(code, lino, _error);
×
1136

1137
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
1138

1139
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
×
1140
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
×
1141

1142
  setOperatorInfo(pOperator, "ForecastOperator", QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, false, OP_NOT_OPENED, pInfo,
×
1143
                  pTaskInfo);
1144
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, forecastNext, NULL, destroyForecastInfo, optrDefaultBufFn,
×
1145
                                         NULL, optrDefaultGetNextExtFn, NULL);
1146

1147
  setOperatorResetStateFn(pOperator, resetForecastOperState);
×
1148

1149
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
×
1150
  QUERY_CHECK_CODE(code, lino, _error);
×
1151

1152
  code = appendDownstream(pOperator, &downstream, 1);
×
1153
  QUERY_CHECK_CODE(code, lino, _error);
×
1154

1155
  *pOptrInfo = pOperator;
×
1156

1157
  qDebug("%s forecast env is initialized, option:%s", pId, pSupp->pOptions);
×
1158
  return TSDB_CODE_SUCCESS;
×
1159

1160
_error:
×
1161
  if (code != TSDB_CODE_SUCCESS) {
×
1162
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
×
1163
  }
1164
  if (pInfo != NULL) destroyForecastInfo(pInfo);
×
1165
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1166
  pTaskInfo->code = code;
×
1167
  return code;
×
1168
}
1169

1170
static void destroyColFutureData(void* p) {
×
1171
  SColFutureData* pData = p;
×
1172
  taosMemoryFree(pData->pName);
×
1173
  colDataDestroy(&pData->data);
×
1174
}
×
1175

1176
static void destroyForecastInfo(void* param) {
×
1177
  SForecastOperatorInfo* pInfo = (SForecastOperatorInfo*)param;
×
1178

1179
  blockDataDestroy(pInfo->pRes);
×
1180
  pInfo->pRes = NULL;
×
1181

1182
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
×
1183
  pInfo->forecastSupp.pCovariateSlotList = NULL;
×
1184

1185
  taosArrayDestroyEx(pInfo->forecastSupp.pDynamicRealList, destroyColFutureData);
×
1186
  pInfo->forecastSupp.pDynamicRealList = NULL;
×
1187

1188
  taosMemoryFree(pInfo->forecastSupp.pOptions);
×
1189

1190
  cleanupExprSupp(&pInfo->scalarSup);
×
1191
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
×
1192
  taosMemoryFreeClear(param);
×
1193
}
×
1194

1195
#else
1196

1197
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1198
                                   SOperatorInfo** pOptrInfo) {
1199
  return TSDB_CODE_OPS_NOT_SUPPORT;
1200
}
1201

1202
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc