• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4295

14 Jun 2025 08:14AM UTC coverage: 62.777% (-0.1%) from 62.881%
#4295

push

travis-ci

web-flow
refactor: update container_build.sh to include branch option for taosadapter (#31367)

157612 of 320085 branches covered (49.24%)

Branch coverage included in aggregate %.

243488 of 318844 relevant lines covered (76.37%)

6438123.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.5
/source/libs/executor/src/forecastoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tanalytics.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "tfill.h"
26
#include "ttime.h"
27

28
#ifdef USE_ANALYTICS
29

30
#define ALGO_OPT_RETCONF_NAME      "return_conf"
31
#define ALGO_OPT_FORECASTROWS_NAME "rows"
32
#define ALGO_OPT_CONF_NAME         "conf"
33
#define ALGO_OPT_START_NAME        "start"
34
#define ALGO_OPT_EVERY_NAME        "every"
35

36
typedef struct {
37
  char*           pName;
38
  SColumnInfoData data;
39
  SColumn         col;
40
  int32_t         numOfRows;
41
} SColFutureData;
42

43
typedef struct {
44
  char         algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
45
  char         algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
46
  char         algoOpt[TSDB_ANALYTIC_ALGO_OPTION_LEN];
47
  int64_t      maxTs;
48
  int64_t      minTs;
49
  int64_t      numOfRows;
50
  uint64_t     groupId;
51
  int64_t      forecastRows;
52
  int64_t      cachedRows;
53
  int32_t      numOfBlocks;
54
  int64_t      timeout;
55
  int16_t      resTsSlot;
56
  int16_t      resValSlot;
57
  int16_t      resLowSlot;
58
  int16_t      resHighSlot;
59
  int16_t      inputTsSlot;
60
  int16_t      targetValSlot;
61
  int8_t       targetValType;
62
  int8_t       inputPrecision;
63
  int8_t       wncheck;
64
  double       conf;
65
  int64_t      startTs;
66
  int64_t      every;
67
  int8_t       setStart;
68
  int8_t       setEvery;
69
  SArray*      pCovariateSlotList;   // covariate slot list
70
  SArray*      pDynamicRealList;     // dynamic real data list
71
  int32_t      numOfInputCols;
72
  SAnalyticBuf analyBuf;
73
} SForecastSupp;
74

75
typedef struct SForecastOperatorInfo {
76
  SSDataBlock*  pRes;
77
  SExprSupp     scalarSup;  // scalar calculation
78
  SForecastSupp forecastSupp;
79
} SForecastOperatorInfo;
80

81
static void destroyForecastInfo(void* param);
82
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id);
83

84
static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int32_t newRowsNum) {
85
  if (pBlock->info.rows < pBlock->info.capacity) {
22!
86
    return TSDB_CODE_SUCCESS;
22✔
87
  }
88

89
  int32_t code = blockDataEnsureCapacity(pBlock, newRowsNum);
×
90
  if (code != TSDB_CODE_SUCCESS) {
×
91
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
92
    return code;
×
93
  }
94

95
  return TSDB_CODE_SUCCESS;
×
96
}
97

98
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) {
22✔
99
  int32_t       code = TSDB_CODE_SUCCESS;
22✔
100
  int32_t       lino = 0;
22✔
101
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
22✔
102

103
  if (pSupp->cachedRows > ANALY_FORECAST_MAX_ROWS) {
22!
104
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
105
    qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows,
×
106
           tstrerror(code), ANALY_FORECAST_MAX_ROWS);
107
    return code;
×
108
  }
109

110
  pSupp->numOfBlocks++;
22✔
111
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
22!
112

113
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
396✔
114
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->targetValSlot);
374✔
115
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputTsSlot);
374✔
116
    if (pTsCol == NULL || pValCol == NULL) break;
374!
117

118
    int32_t index = 0;
374✔
119
    int64_t ts = ((TSKEY*)pTsCol->pData)[j];
374✔
120
    char*   val = colDataGetData(pValCol, j);
374!
121
    int16_t valType = pValCol->info.type;
374✔
122

123
    pSupp->minTs = MIN(pSupp->minTs, ts);
374✔
124
    pSupp->maxTs = MAX(pSupp->maxTs, ts);
374✔
125
    pSupp->numOfRows++;
374✔
126

127
    // write the primary time stamp column data
128
    code = taosAnalyBufWriteColData(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, &ts);
374✔
129
    if (TSDB_CODE_SUCCESS != code) {
374!
130
      qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
×
131
      return code;
×
132
    }
133

134
    // write the main column for forecasting
135
    code = taosAnalyBufWriteColData(pBuf, index++, valType, val);
374✔
136
    if (TSDB_CODE_SUCCESS != code) {
374!
137
      qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
138
      return code;
×
139
    }
140

141
    // let's handle the dynamic_real_columns
142
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pDynamicRealList); ++i) {
374!
143
      SColFutureData*  pCol = taosArrayGet(pSupp->pDynamicRealList, i);
×
144
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->col.slotId);
×
145

146
      char* pVal = colDataGetData(pColData, j);
×
147
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->col.type, pVal);
×
148
      if (TSDB_CODE_SUCCESS != code) {
×
149
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
150
        return code;
×
151
      }
152
    }
153

154
    // now handle the past_dynamic_real columns and
155
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
374!
156
      SColumn*         pCol = taosArrayGet(pSupp->pCovariateSlotList, i);
×
157
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
×
158

159
      char* pVal = colDataGetData(pColData, j);
×
160
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->type, pVal);
×
161
      if (TSDB_CODE_SUCCESS != code) {
×
162
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
163
        return code;
×
164
      }
165
    }
166
  }
167

168
  return 0;
22✔
169
}
170

171
static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
22✔
172
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
22✔
173
  int32_t       code = 0;
22✔
174

175
  // add the future dynamic real column data
176

177
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
66✔
178
    // the primary timestamp and the forecast column
179
    // add the future dynamic real column data
180
    if ((i >= 2) && ((i - 2) < taosArrayGetSize(pSupp->pDynamicRealList))) {
44!
181
      SColFutureData* pCol = taosArrayGet(pSupp->pDynamicRealList, i - 2);
×
182
      int16_t         valType = pCol->col.type;
×
183
      for (int32_t j = 0; j < pCol->numOfRows; ++j) {
×
184
        char* pVal = colDataGetData(&pCol->data, j);
×
185
        taosAnalyBufWriteColData(pBuf, i, valType, pVal);
×
186
      }
187
    }
188

189
    code = taosAnalyBufWriteColEnd(pBuf, i);
44✔
190
    if (code != 0) return code;
44!
191
  }
192

193
  code = taosAnalyBufWriteDataEnd(pBuf);
22✔
194
  if (code != 0) return code;
22!
195

196
  code = taosAnalyBufWriteOptStr(pBuf, "option", pSupp->algoOpt);
22✔
197
  if (code != 0) return code;
22!
198

199
  code = taosAnalyBufWriteOptStr(pBuf, "algo", pSupp->algoName);
22✔
200
  if (code != 0) return code;
22!
201

202
  const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
22✔
203
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
22!
204
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
22!
205
  code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
22✔
206
  if (code != 0) return code;
22!
207

208
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_WNCHECK_NAME, pSupp->wncheck);
22✔
209
  if (code != 0) return code;
22!
210

211
  bool noConf = (pSupp->resHighSlot == -1 && pSupp->resLowSlot == -1);
22!
212
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_RETCONF_NAME, !noConf);
22✔
213
  if (code != 0) return code;
22!
214

215
  if (pSupp->cachedRows < ANALY_FORECAST_MIN_ROWS) {
22!
216
    qError("%s history rows for forecasting not enough, min required:%d, current:%" PRId64, id, ANALY_FORECAST_MIN_ROWS,
×
217
           pSupp->forecastRows);
218
    return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
219
  }
220

221
  code = taosAnalyBufWriteOptInt(pBuf, "forecast_rows", pSupp->forecastRows);
22✔
222
  if (code != 0) return code;
22!
223

224
  code = taosAnalyBufWriteOptFloat(pBuf, "conf", pSupp->conf);
22✔
225
  if (code != 0) return code;
22!
226

227
  int32_t len = strlen(pSupp->algoOpt);
22✔
228
  int64_t every = (pSupp->setEvery != 0) ? pSupp->every : ((pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows - 1));
22✔
229
  code = taosAnalyBufWriteOptInt(pBuf, "every", every);
22✔
230
  if (code != 0) return code;
22!
231

232
  int64_t start = (pSupp->setStart != 0) ? pSupp->startTs : pSupp->maxTs + every;
22✔
233
  code = taosAnalyBufWriteOptInt(pBuf, "start", start);
22✔
234
  if (code != 0) return code;
22!
235

236
  if (taosArrayGetSize(pSupp->pCovariateSlotList) + taosArrayGetSize(pSupp->pDynamicRealList) > 0) {
22!
237
    code = taosAnalyBufWriteOptStr(pBuf, "type", "covariate");
×
238
    if (code != 0) return code;
×
239
  }
240

241
  code = taosAnalyBufClose(pBuf);
22✔
242
  return code;
22✔
243
}
244

245
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
22✔
246
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
22✔
247
  int32_t       resCurRow = pBlock->info.rows;
22✔
248
  int8_t        tmpI8 = 0;
22✔
249
  int16_t       tmpI16 = 0;
22✔
250
  int32_t       tmpI32 = 0;
22✔
251
  int64_t       tmpI64 = 0;
22✔
252
  float         tmpFloat = 0;
22✔
253
  double        tmpDouble = 0;
22✔
254
  int32_t       code = 0;
22✔
255

256
  SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot);
22✔
257
  if (NULL == pResValCol) {
22!
258
    return terrno;
×
259
  }
260

261
  SColumnInfoData* pResTsCol = ((pSupp->resTsSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
22✔
262
  SColumnInfoData* pResLowCol =
22✔
263
      ((pSupp->resLowSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
22✔
264
  SColumnInfoData* pResHighCol =
22✔
265
      (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
22✔
266

267
  SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout);
22✔
268
  if (pJson == NULL) {
22!
269
    return terrno;
×
270
  }
271

272
  int32_t rows = 0;
22✔
273
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
22✔
274
  if (rows < 0 && code == 0) {
22!
275
    char pMsg[1024] = {0};
3✔
276
    code = tjsonGetStringValue(pJson, "msg", pMsg);
3✔
277
    if (code != 0) {
3!
278
      qError("%s failed to get msg from rsp, unknown error", pId);
×
279
    } else {
280
      qError("%s failed to exec forecast, msg:%s", pId, pMsg);
3!
281
    }
282

283
    tjsonDelete(pJson);
3✔
284
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
3✔
285
  }
286

287
  if (code < 0) {
19!
288
    goto _OVER;
×
289
  }
290

291
  SJson* res = tjsonGetObjectItem(pJson, "res");
19✔
292
  if (res == NULL) goto _OVER;
19!
293
  int32_t ressize = tjsonGetArraySize(res);
19✔
294
  bool    returnConf = (pSupp->resHighSlot != -1 || pSupp->resLowSlot != -1);
19!
295

296
  if ((returnConf && (ressize != 4)) || ((!returnConf) && (ressize != 2))) {
19!
297
    goto _OVER;
×
298
  }
299

300
  if (pResTsCol != NULL) {
19✔
301
    resCurRow = pBlock->info.rows;
9✔
302
    SJson* tsJsonArray = tjsonGetArrayItem(res, 0);
9✔
303
    if (tsJsonArray == NULL) goto _OVER;
9!
304
    int32_t tsSize = tjsonGetArraySize(tsJsonArray);
9✔
305
    if (tsSize != rows) goto _OVER;
9!
306
    for (int32_t i = 0; i < tsSize; ++i) {
94✔
307
      SJson* tsJson = tjsonGetArrayItem(tsJsonArray, i);
85✔
308
      tjsonGetObjectValueBigInt(tsJson, &tmpI64);
85✔
309
      colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
85✔
310
      resCurRow++;
85✔
311
    }
312
  }
313

314
  if (pResLowCol != NULL) {
19✔
315
    resCurRow = pBlock->info.rows;
9✔
316
    SJson* lowJsonArray = tjsonGetArrayItem(res, 2);
9✔
317
    if (lowJsonArray == NULL) goto _OVER;
9!
318
    int32_t lowSize = tjsonGetArraySize(lowJsonArray);
9✔
319
    if (lowSize != rows) goto _OVER;
9!
320
    for (int32_t i = 0; i < lowSize; ++i) {
94✔
321
      SJson* lowJson = tjsonGetArrayItem(lowJsonArray, i);
85✔
322
      tjsonGetObjectValueDouble(lowJson, &tmpDouble);
85✔
323
      tmpFloat = (float)tmpDouble;
85✔
324
      colDataSetFloat(pResLowCol, resCurRow, &tmpFloat);
85✔
325
      resCurRow++;
85✔
326
    }
327
  }
328

329
  if (pResHighCol != NULL) {
19✔
330
    resCurRow = pBlock->info.rows;
9✔
331
    SJson* highJsonArray = tjsonGetArrayItem(res, 3);
9✔
332
    if (highJsonArray == NULL) goto _OVER;
9!
333
    int32_t highSize = tjsonGetArraySize(highJsonArray);
9✔
334
    if (highSize != rows) goto _OVER;
9!
335
    for (int32_t i = 0; i < highSize; ++i) {
94✔
336
      SJson* highJson = tjsonGetArrayItem(highJsonArray, i);
85✔
337
      tjsonGetObjectValueDouble(highJson, &tmpDouble);
85✔
338
      tmpFloat = (float)tmpDouble;
85✔
339
      colDataSetFloat(pResHighCol, resCurRow, &tmpFloat);
85✔
340
      resCurRow++;
85✔
341
    }
342
  }
343

344
  resCurRow = pBlock->info.rows;
19✔
345
  SJson* valJsonArray = tjsonGetArrayItem(res, 1);
19✔
346
  if (valJsonArray == NULL) goto _OVER;
19!
347
  int32_t valSize = tjsonGetArraySize(valJsonArray);
19✔
348
  if (valSize != rows) goto _OVER;
19!
349
  for (int32_t i = 0; i < valSize; ++i) {
150✔
350
    SJson* valJson = tjsonGetArrayItem(valJsonArray, i);
131✔
351
    tjsonGetObjectValueDouble(valJson, &tmpDouble);
131✔
352

353
    colDataSetDouble(pResValCol, resCurRow, &tmpDouble);
131✔
354
    resCurRow++;
131✔
355
  }
356

357
  pBlock->info.rows += rows;
19✔
358

359
  if (pJson != NULL) tjsonDelete(pJson);
19!
360
  return 0;
19✔
361

362
_OVER:
×
363
  tjsonDelete(pJson);
×
364
  if (code == 0) {
×
365
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
366
  }
367

368
  qError("%s failed to perform forecast finalize since %s", pId, tstrerror(code));
×
369
  return code;
×
370
}
371

372
static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) {
22✔
373
  int32_t       code = TSDB_CODE_SUCCESS;
22✔
374
  int32_t       lino = 0;
22✔
375
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
22✔
376

377
  code = forecastCloseBuf(pSupp, pId);
22✔
378
  QUERY_CHECK_CODE(code, lino, _end);
22!
379

380
  code = forecastEnsureBlockCapacity(pResBlock, 1);
22✔
381
  QUERY_CHECK_CODE(code, lino, _end);
22!
382

383
  code = forecastAnalysis(pSupp, pResBlock, pId);
22✔
384
  QUERY_CHECK_CODE(code, lino, _end);
22✔
385

386
  uInfo("%s block:%d, forecast finalize", pId, pSupp->numOfBlocks);
19!
387

388
_end:
×
389
  pSupp->numOfBlocks = 0;
22✔
390
  taosAnalyBufDestroy(&pSupp->analyBuf);
22✔
391
  return code;
22✔
392
}
393

394
static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
41✔
395
  int32_t                code = TSDB_CODE_SUCCESS;
41✔
396
  int32_t                lino = 0;
41✔
397
  SExecTaskInfo*         pTaskInfo = pOperator->pTaskInfo;
41✔
398
  SForecastOperatorInfo* pInfo = pOperator->info;
41✔
399
  SSDataBlock*           pResBlock = pInfo->pRes;
41✔
400
  SForecastSupp*         pSupp = &pInfo->forecastSupp;
41✔
401
  SAnalyticBuf*          pBuf = &pSupp->analyBuf;
41✔
402
  int64_t                st = taosGetTimestampUs();
41✔
403
  int32_t                numOfBlocks = pSupp->numOfBlocks;
41✔
404
  const char*            pId = GET_TASKID(pOperator->pTaskInfo);
41✔
405

406
  blockDataCleanup(pResBlock);
41✔
407

408
  while (1) {
22✔
409
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
63✔
410
    if (pBlock == NULL) {
63✔
411
      break;
41✔
412
    }
413

414
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
22!
415
      pSupp->groupId = pBlock->info.id.groupId;
22✔
416
      numOfBlocks++;
22✔
417
      pSupp->cachedRows += pBlock->info.rows;
22✔
418
      qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks,
22!
419
             pBlock->info.rows, pSupp->cachedRows);
420
      code = forecastCacheBlock(pSupp, pBlock, pId);
22✔
421
      QUERY_CHECK_CODE(code, lino, _end);
22!
422
    } else {
423
      qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks);
×
424
      code = forecastAggregateBlocks(pSupp, pResBlock, pId);
×
425
      QUERY_CHECK_CODE(code, lino, _end);
×
426
      pSupp->groupId = pBlock->info.id.groupId;
×
427
      numOfBlocks = 1;
×
428
      pSupp->cachedRows = pBlock->info.rows;
×
429
      qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId,
×
430
             pBlock->info.rows, pSupp->cachedRows);
431
      code = forecastCacheBlock(pSupp, pBlock, pId);
×
432
      QUERY_CHECK_CODE(code, lino, _end);
×
433
    }
434

435
    if (pResBlock->info.rows > 0) {
22!
436
      (*ppRes) = pResBlock;
×
437
      qDebug("%s group:%" PRId64 ", return to upstream, blocks:%d", pId, pResBlock->info.id.groupId, numOfBlocks);
×
438
      return code;
×
439
    }
440
  }
441

442
  if (numOfBlocks > 0) {
41✔
443
    qDebug("%s group:%" PRId64 ", read finish, blocks:%d", pId, pSupp->groupId, numOfBlocks);
22!
444
    code = forecastAggregateBlocks(pSupp, pResBlock, pId);
22✔
445
    QUERY_CHECK_CODE(code, lino, _end);
22✔
446
  }
447

448
  int64_t cost = taosGetTimestampUs() - st;
38✔
449
  qDebug("%s all groups finished, cost:%" PRId64 "us", pId, cost);
38!
450

451
_end:
×
452
  if (code != TSDB_CODE_SUCCESS) {
41✔
453
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
3!
454
    pTaskInfo->code = code;
3✔
455
    T_LONG_JMP(pTaskInfo->env, code);
3!
456
  }
457

458
  (*ppRes) = (pResBlock->info.rows == 0) ? NULL : pResBlock;
38✔
459
  return code;
38✔
460
}
461

462
static int32_t forecastParseOutput(SForecastSupp* pSupp, SExprSupp* pExprSup) {
23✔
463
  pSupp->resLowSlot = -1;
23✔
464
  pSupp->resHighSlot = -1;
23✔
465
  pSupp->resTsSlot = -1;
23✔
466
  pSupp->resValSlot = -1;
23✔
467

468
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
73✔
469
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
50✔
470
    int32_t    dstSlot = pExprInfo->base.resSchema.slotId;
50✔
471
    if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST) {
50✔
472
      pSupp->resValSlot = dstSlot;
23✔
473
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_ROWTS) {
27✔
474
      pSupp->resTsSlot = dstSlot;
9✔
475
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_LOW) {
18✔
476
      pSupp->resLowSlot = dstSlot;
9✔
477
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_HIGH) {
9!
478
      pSupp->resHighSlot = dstSlot;
9✔
479
    } else {
480
    }
481
  }
482

483
  return 0;
23✔
484
}
485

486
static int32_t validInputParams(SFunctionNode* pFunc, const char* id) {
23✔
487
  int32_t code = 0;
23✔
488
  int32_t lino = 0;
23✔
489
  int32_t num = LIST_LENGTH(pFunc->pParameterList);
23!
490

491
  TSDB_CHECK_CONDITION(num > 1, code, lino, _end, TSDB_CODE_PLAN_INTERNAL_ERROR);
23!
492

493
  for (int32_t i = 0; i < num; ++i) {
92✔
494
    SNode* p = nodesListGetNode(pFunc->pParameterList, i);
69✔
495
    TSDB_CHECK_NULL(p, code, lino, _end, TSDB_CODE_PLAN_INTERNAL_ERROR)
69!
496
  }
497

498
  if (num == 2) {  // column_name, timestamp_column_name
23!
499
    SNode* p1 = nodesListGetNode(pFunc->pParameterList, 0);
×
500
    SNode* p2 = nodesListGetNode(pFunc->pParameterList, 1);
×
501

502
    TSDB_CHECK_CONDITION(p1->type == QUERY_NODE_COLUMN, code, lino, _end, TSDB_CODE_PLAN_INTERNAL_ERROR);
×
503
    TSDB_CHECK_CONDITION(p2->type == QUERY_NODE_COLUMN, code, lino, _end, TSDB_CODE_PLAN_INTERNAL_ERROR);
×
504
  } else if (num >= 3) {  // column_name_#1, column_name_#2...., analytics_options, timestamp_column_name
23!
505
    for (int32_t i = 0; i < num - 2; ++i) {
46✔
506
      SNode* p1 = nodesListGetNode(pFunc->pParameterList, i);
23✔
507
      if (p1->type != QUERY_NODE_COLUMN) {
23!
508
        code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
509
        goto _end;
×
510
      }
511
    }
512

513
    SNode* p2 = nodesListGetNode(pFunc->pParameterList, num - 2);
23✔
514
    SNode* p3 = nodesListGetNode(pFunc->pParameterList, num - 1);
23✔
515

516
    TSDB_CHECK_CONDITION(p2->type == QUERY_NODE_VALUE, code, lino, _end, TSDB_CODE_PLAN_INTERNAL_ERROR);
23!
517
    TSDB_CHECK_CONDITION(p3->type == QUERY_NODE_COLUMN, code, lino, _end, TSDB_CODE_PLAN_INTERNAL_ERROR);
23!
518
  }
519

520
_end:
23✔
521
  if (code) {
23!
522
    qError("%s valid the parameters failed, line:%d, code:%s", id, lino, tstrerror(code));
×
523
  }
524
  return code;
23✔
525
}
526

527
static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs, const char* id) {
23✔
528
  int32_t code = 0;
23✔
529
  SNode* pNode = NULL;
23✔
530
  pSupp->inputTsSlot = -1;
23✔
531
  pSupp->targetValSlot = -1;
23✔
532
  pSupp->targetValType = -1;
23✔
533
  pSupp->inputPrecision = -1;
23✔
534

535
  FOREACH(pNode, pFuncs) {
73!
536
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
50!
537
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
50✔
538
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
50✔
539

540
      if (pFunc->funcType == FUNCTION_TYPE_FORECAST) {
50✔
541
        code = validInputParams(pFunc, id);
23✔
542
        if (code) {
23!
543
          return code;
×
544
        }
545

546
        pSupp->numOfInputCols = 2;
23✔
547

548
        if (numOfParam == 2) {
23!
549
          // column, ts
550
          SColumnNode* pValNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
551
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
552

553
          pSupp->inputTsSlot = pTsNode->slotId;
×
554
          pSupp->inputPrecision = pTsNode->node.resType.precision;
×
555
          pSupp->targetValSlot = pValNode->slotId;
×
556
          pSupp->targetValType = pValNode->node.resType.type;
×
557

558
          // let's add the holtwinters as the default forecast algorithm
559
          tstrncpy(pSupp->algoOpt, "algo=holtwinters", TSDB_ANALYTIC_ALGO_OPTION_LEN);
×
560
        } else if (numOfParam == 3) {
23!
561
          // column, options, ts
562
          SColumnNode* pValNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
23✔
563
          SValueNode*  pOptNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
23✔
564
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 2);
23✔
565

566
          pSupp->inputTsSlot = pTsNode->slotId;
23✔
567
          pSupp->inputPrecision = pTsNode->node.resType.precision;
23✔
568
          pSupp->targetValSlot = pValNode->slotId;
23✔
569
          pSupp->targetValType = pValNode->node.resType.type;
23✔
570
          tstrncpy(pSupp->algoOpt, pOptNode->literal, sizeof(pSupp->algoOpt));
23✔
571
        } else {
572
          SColumnNode* pValNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
573
          SValueNode*  pOptNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, numOfParam - 2);
×
574
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, numOfParam - 1);
×
575

576
          pSupp->inputTsSlot = pTsNode->slotId;
×
577
          pSupp->inputPrecision = pTsNode->node.resType.precision;
×
578

579
          pSupp->targetValSlot = pValNode->slotId;
×
580
          pSupp->targetValType = pValNode->node.resType.type;
×
581
          tstrncpy(pSupp->algoOpt, pOptNode->literal, sizeof(pSupp->algoOpt));
×
582

583
          pSupp->pCovariateSlotList = taosArrayInit(4, sizeof(SColumn));
×
584
          pSupp->pDynamicRealList = taosArrayInit(4, sizeof(SColFutureData));
×
585

586
          for(int32_t i = 1; i < numOfParam - 2; ++i) {
×
587
            SColumnNode* p = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, i);
×
588
            SColumn col = {.slotId = p->slotId, .colType = p->colType, .type = p->node.resType.type, .bytes = p->node.resType.bytes};
×
589
            tstrncpy(col.name, p->colName, tListLen(col.name));
×
590

591
            taosArrayPush(pSupp->pCovariateSlotList, &col);
×
592
          }
593

594
          pSupp->numOfInputCols += (numOfParam - 3);
×
595
        }
596
      }
597
    }
598
  }
599

600
  return 0;
23✔
601
}
602

603
static void initForecastOpt(SForecastSupp* pSupp) {
23✔
604
  pSupp->maxTs = 0;
23✔
605
  pSupp->minTs = INT64_MAX;
23✔
606
  pSupp->numOfRows = 0;
23✔
607
  pSupp->wncheck = ANALY_FORECAST_DEFAULT_WNCHECK;
23✔
608
  pSupp->forecastRows = ANALY_FORECAST_DEFAULT_ROWS;
23✔
609
  pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
23✔
610
  pSupp->setEvery = 0;
23✔
611
  pSupp->setStart = 0;
23✔
612
}
23✔
613

614
static int32_t filterNotSupportForecast(SForecastSupp* pSupp) {
22✔
615
  if (taosArrayGetSize(pSupp->pCovariateSlotList) > 0) {
22!
616
    if (taosStrcasecmp(pSupp->algoName, "holtwinters") == 0) {
×
617
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
618
    } else if (taosStrcasecmp(pSupp->algoName, "arima") == 0) {
×
619
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
620
    } else if (taosStrcasecmp(pSupp->algoName, "timemoe-fc") == 0) {
×
621
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
622
    }
623
  }
624

625
  return TSDB_CODE_SUCCESS;
22✔
626
}
627

628

629
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
23✔
630
  int32_t   code = 0;
23✔
631
  int32_t   lino = 0;
23✔
632
  SHashObj* pHashMap = NULL;
23✔
633

634
  initForecastOpt(pSupp);
23✔
635

636
  code = taosAnalyGetOpts(pSupp->algoOpt, &pHashMap);
23✔
637
  if (code != TSDB_CODE_SUCCESS) {
23!
638
    return code;
×
639
  }
640

641
  code = taosAnalysisParseAlgo(pSupp->algoOpt, pSupp->algoName, pSupp->algoUrl, ANALY_ALGO_TYPE_FORECAST,
23✔
642
                               tListLen(pSupp->algoUrl), pHashMap, id);
643
  TSDB_CHECK_CODE(code, lino, _end);
23✔
644

645
  code = filterNotSupportForecast(pSupp);
22✔
646
  if (code) {
22!
647
    qError("%s not support forecast model, %s", id, pSupp->algoName);
×
648
    TSDB_CHECK_CODE(code, lino, _end);
×
649
  }
650

651
  // extract the timeout parameter
652
  pSupp->timeout = taosAnalysisParseTimout(pHashMap, id);
22✔
653
  pSupp->wncheck = taosAnalysisParseWncheck(pHashMap, id);
22✔
654

655
  // extract the forecast rows
656
  char* pRows = taosHashGet(pHashMap, ALGO_OPT_FORECASTROWS_NAME, strlen(ALGO_OPT_FORECASTROWS_NAME));
22✔
657
  if (pRows != NULL) {
22✔
658
    int64_t v = 0;
9✔
659
    code = toInteger(pRows, taosHashGetValueSize(pRows), 10, &v);
9✔
660

661
    pSupp->forecastRows = v;
9✔
662
    qDebug("%s forecast rows:%"PRId64, id, pSupp->forecastRows);
9!
663
  } else {
664
    qDebug("%s forecast rows not found:%s, use default:%" PRId64, id, pSupp->algoOpt, pSupp->forecastRows);
13!
665
  }
666

667
  if (pSupp->forecastRows > ANALY_FORECAST_RES_MAX_ROWS) {
22!
668
    qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_FORECAST_RES_MAX_ROWS,
×
669
           pSupp->forecastRows);
670
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
671
    goto _end;
×
672
  }
673

674
  // extract the confidence interval value
675
  char* pConf = taosHashGet(pHashMap, ALGO_OPT_CONF_NAME, strlen(ALGO_OPT_CONF_NAME));
22✔
676
  if (pConf != NULL) {
22✔
677
    char*  endPtr = NULL;
19✔
678
    double v = taosStr2Double(pConf, &endPtr);
19✔
679
    pSupp->conf = v;
19✔
680

681
    if (v <= 0 || v > 1.0) {
19!
682
      pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
×
683
      qWarn("%s valid conf range is (0, 1], user specified:%.2f out of range, set the default:%.2f", id, v,
×
684
             pSupp->conf);
685
    } else {
686
      qDebug("%s forecast conf:%.2f", id, pSupp->conf);
19!
687
    }
688
  } else {
689
    qDebug("%s forecast conf not found:%s, use default:%.2f", id, pSupp->algoOpt, pSupp->conf);
3!
690
  }
691

692
  // extract the start timestamp
693
  char* pStart = taosHashGet(pHashMap, ALGO_OPT_START_NAME, strlen(ALGO_OPT_START_NAME));
22✔
694
  if (pStart != NULL) {
22✔
695
    int64_t v = 0;
2✔
696
    code = toInteger(pStart, taosHashGetValueSize(pStart), 10, &v);
2✔
697
    pSupp->startTs = v;
2✔
698
    pSupp->setStart = 1;
2✔
699
    qDebug("%s forecast set start ts:%"PRId64, id, pSupp->startTs);
2!
700
  }
701

702
  // extract the time step
703
  char* pEvery = taosHashGet(pHashMap, ALGO_OPT_EVERY_NAME, strlen(ALGO_OPT_EVERY_NAME));
22✔
704
  if (pEvery != NULL) {
22✔
705
    int64_t v = 0;
3✔
706
    code = toInteger(pEvery, taosHashGetValueSize(pEvery), 10, &v);
3✔
707
    pSupp->every = v;
3✔
708
    pSupp->setEvery = 1;
3✔
709
    qDebug("%s forecast set every ts:%"PRId64, id, pSupp->every);
3!
710
  }
711

712
  // extract the dynamic real feature for covariate forecasting
713
  void*       pIter = NULL;
22✔
714
  size_t      keyLen = 0;
22✔
715
  const char* p = "dynamic_real_";
22✔
716

717
  while ((pIter = taosHashIterate(pHashMap, pIter))) {
84✔
718
    const char* pVal = pIter;
62✔
719
    char*       pKey = taosHashGetKey((void*)pVal, &keyLen);
62✔
720
    int32_t     idx = 0;
62✔
721
    char        nameBuf[512] = {0};
62✔
722

723
    if (strncmp(pKey, p, strlen(p)) == 0) {
62!
724

725
      if (strncmp(&pKey[keyLen - 4], "_col", 4) == 0) {
×
726
        continue;
×
727
      }
728

729
      int32_t ret = sscanf(pKey, "dynamic_real_%d", &idx);
×
730
      if (ret == 0) {
×
731
        continue;
×
732
      }
733

734
      memcpy(nameBuf, pKey, keyLen);
×
735
      strncpy(&nameBuf[keyLen], "_col", strlen("_col"));
×
736

737
      void* pCol = taosHashGet(pHashMap, nameBuf, strlen(nameBuf));
×
738
      if (pCol == NULL) {
×
739
        qError("%s dynamic real column related:%s column name:%s not specified", id, pKey, nameBuf);
×
740
        code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
741
        goto _end;
×
742
      } else {
743
        // build dynamic_real_feature
744
        SColFutureData d = {.pName = taosStrndupi(pCol, taosHashGetValueSize(pCol))};
×
745
        if (d.pName == NULL) {
×
746
          qError("%s failed to clone the future dynamic real column name:%s", id, (char*) pCol);
×
747
          code = terrno;
×
748
          goto _end;
×
749
        }
750

751
        int32_t index = -1;
×
752
        for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
×
753
          SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, i);
×
754
          if (strcmp(pColx->name, d.pName) == 0) {
×
755
            index = i;
×
756
            break;
×
757
          }
758
        }
759

760
        if (index == -1) {
×
761
          qError("%s not found the required future dynamic real column:%s", id, d.pName);
×
762
          code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
763
          goto _end;
×
764
        }
765

766
        SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, index);
×
767
        d.data.info.slotId = pColx->slotId;
×
768
        d.data.info.type = pColx->type;
×
769
        d.data.info.bytes = pColx->bytes;
×
770

771
        char*   buf = taosStrndup(pVal, taosHashGetValueSize((void*)pVal));
×
772
        int32_t unused = strdequote((char*)buf);
×
773

774
        int32_t num = 0;
×
775
        char**  pList = strsplit(buf, " ", &num);
×
776
        if (num != pSupp->forecastRows) {
×
777
          qError("%s the rows:%d of future dynamic real column data is not equalled to the forecasting rows:%" PRId64,
×
778
                 id, num, pSupp->forecastRows);
779
          code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
780
          goto _end;
×
781
        }
782

783
        d.numOfRows = num;
×
784

785
        colInfoDataEnsureCapacity(&d.data, num, true);
×
786
        for (int32_t j = 0; j < num; ++j) {
×
787
          char* ps = NULL;
×
788
          if (j == 0) {
×
789
            ps = strstr(pList[j], "[") + 1;
×
790
          } else {
791
            ps = pList[j];
×
792
          }
793

794
          switch(pColx->type) {
×
795
            case TSDB_DATA_TYPE_TINYINT: {
×
796
              int8_t t1 = taosStr2Int8(ps, NULL, 10);
×
797
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
798
              break;
×
799
            }
800
            case TSDB_DATA_TYPE_SMALLINT: {
×
801
              int16_t t1 = taosStr2Int16(ps, NULL, 10);
×
802
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
803
              break;
×
804
            }
805
            case TSDB_DATA_TYPE_INT: {
×
806
              int32_t t1 = taosStr2Int32(ps, NULL, 10);
×
807
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
808
              break;
×
809
            }
810
            case TSDB_DATA_TYPE_BIGINT: {
×
811
              int64_t t1 = taosStr2Int64(ps, NULL, 10);
×
812
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
813
              break;
×
814
            }
815
            case TSDB_DATA_TYPE_FLOAT: {
×
816
              float t1 = taosStr2Float(ps, NULL);
×
817
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
818
              break;
×
819
            }
820
            case TSDB_DATA_TYPE_DOUBLE: {
×
821
              double t1 = taosStr2Double(ps, NULL);
×
822
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
823
              break;
×
824
            }
825
            case TSDB_DATA_TYPE_UTINYINT: {
×
826
              uint8_t t1 = taosStr2UInt8(ps, NULL, 10);
×
827
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
828
              break;
×
829
            }
830
            case TSDB_DATA_TYPE_USMALLINT: {
×
831
              uint16_t t1 = taosStr2UInt16(ps, NULL, 10);
×
832
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
833
              break;
×
834
            }
835
            case TSDB_DATA_TYPE_UINT: {
×
836
              uint32_t t1 = taosStr2UInt32(ps, NULL, 10);
×
837
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
838
              break;
×
839
            }
840
            case TSDB_DATA_TYPE_UBIGINT: {
×
841
              uint64_t t1 = taosStr2UInt64(ps, NULL, 10);
×
842
              colDataSetVal(&d.data, j, (const char*)&t1, false);
×
843
              break;
×
844
            }
845
          }
846

847
        }
848

849
        void* noret = taosArrayPush(pSupp->pDynamicRealList, &d);
×
850
        if (noret == NULL) {
×
851
          qError("%s failed to add column info in dynamic real column info", id);
×
852
          code = terrno;
×
853
          goto _end;
×
854
        }
855
      }
856
    }
857
  }
858

859
_end:
22✔
860
  taosHashCleanup(pHashMap);
23✔
861
  return code;
23✔
862
}
863

864
static int32_t forecastCreateBuf(SForecastSupp* pSupp) {
22✔
865
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
22✔
866
  int64_t       ts = 0;  // taosGetTimestampMs();
22✔
867
  int32_t       index = 0;
22✔
868

869
  pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
22✔
870
  snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts);
22✔
871

872
  int32_t numOfCols = taosArrayGetSize(pSupp->pCovariateSlotList) + 2;
22✔
873

874
  int32_t code = tsosAnalyBufOpen(pBuf, numOfCols);
22✔
875
  if (code != 0) goto _OVER;
22!
876

877
  code = taosAnalyBufWriteColMeta(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, "ts");
22✔
878
  if (code != 0) goto _OVER;
22!
879

880
  code = taosAnalyBufWriteColMeta(pBuf, index++, pSupp->targetValType, "val");
22✔
881
  if (code != 0) goto _OVER;
22!
882

883
  int32_t numOfDynamicReal = taosArrayGetSize(pSupp->pDynamicRealList);
22✔
884
  int32_t numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
22✔
885

886
  if (numOfPastDynamicReal >= numOfDynamicReal) {
22!
887
    for(int32_t i = 0; i < numOfDynamicReal; ++i) {
22!
888
      SColFutureData* pData = taosArrayGet(pSupp->pDynamicRealList, i);
×
889

890
      for(int32_t k = 0; k < taosArrayGetSize(pSupp->pCovariateSlotList); ++k) {
×
891
        SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, k);
×
892
        if (strcmp(pCol->name, pData->pName) == 0) {
×
893
          char name[128] = {0};
×
894
          (void) tsnprintf(name, tListLen(name), "dynamic_real_%d", i + 1);
×
895
          code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
×
896
          if (code != 0) {
×
897
            goto _OVER;
×
898
          }
899

900
          memcpy(&pData->col, pCol, sizeof(SColumn));
×
901
          taosArrayRemove(pSupp->pCovariateSlotList, k);
×
902
          break;
×
903
        }
904
      }
905
    }
906

907
    numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
22✔
908
    for (int32_t j = 0; j < numOfPastDynamicReal; ++j) {
22!
909
      SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
×
910

911
      char name[128] = {0};
×
912
      (void)tsnprintf(name, tListLen(name), "past_dynamic_real_%d", j + 1);
×
913

914
      code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
×
915
      if (code) {
×
916
        goto _OVER;
×
917
      }
918
    }
919
  }
920

921
  code = taosAnalyBufWriteDataBegin(pBuf);
22✔
922
  if (code != 0) goto _OVER;
22!
923

924
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
66✔
925
    code = taosAnalyBufWriteColBegin(pBuf, i);
44✔
926
    if (code != 0) goto _OVER;
44!
927
  }
928

929
_OVER:
22✔
930
  if (code != 0) {
22!
931
    (void)taosAnalyBufClose(pBuf);
×
932
    taosAnalyBufDestroy(pBuf);
×
933
  }
934
  return code;
22✔
935
}
936

937
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
23✔
938
                                   SOperatorInfo** pOptrInfo) {
939
  QRY_PARAM_CHECK(pOptrInfo);
23!
940

941
  int32_t                code = 0;
23✔
942
  int32_t                lino = 0;
23✔
943
  SForecastOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SForecastOperatorInfo));
23!
944
  SOperatorInfo*         pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
23!
945
  if (pOperator == NULL || pInfo == NULL) {
23!
946
    code = terrno;
×
947
    goto _error;
×
948
  }
949

950
  const char*             pId = pTaskInfo->id.str;
23✔
951
  SForecastSupp*          pSupp = &pInfo->forecastSupp;
23✔
952
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode;
23✔
953
  SExprSupp*              pExprSup = &pOperator->exprSupp;
23✔
954
  int32_t                 numOfExprs = 0;
23✔
955
  SExprInfo*              pExprInfo = NULL;
23✔
956

957
  code = createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
23✔
958
  QUERY_CHECK_CODE(code, lino, _error);
23!
959

960
  code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
23✔
961
  QUERY_CHECK_CODE(code, lino, _error);
23!
962

963
  if (pForecastPhyNode->pExprs != NULL) {
23!
964
    int32_t    num = 0;
×
965
    SExprInfo* pScalarExprInfo = NULL;
×
966
    code = createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
×
967
    QUERY_CHECK_CODE(code, lino, _error);
×
968

969
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
970
    QUERY_CHECK_CODE(code, lino, _error);
×
971
  }
972

973
  code = filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
23✔
974
  QUERY_CHECK_CODE(code, lino, _error);
23!
975

976
  code = forecastParseInput(pSupp, pForecastPhyNode->pFuncs, pId);
23✔
977
  QUERY_CHECK_CODE(code, lino, _error);
23!
978

979
  code = forecastParseOutput(pSupp, pExprSup);
23✔
980
  QUERY_CHECK_CODE(code, lino, _error);
23!
981

982
  code = forecastParseOpt(pSupp, pId);
23✔
983
  QUERY_CHECK_CODE(code, lino, _error);
23✔
984

985
  code = forecastCreateBuf(pSupp);
22✔
986
  QUERY_CHECK_CODE(code, lino, _error);
22!
987

988
  initResultSizeInfo(&pOperator->resultInfo, 4096);
22✔
989

990
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
22✔
991
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
22!
992

993
  setOperatorInfo(pOperator, "ForecastOperator", QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, false, OP_NOT_OPENED, pInfo,
22✔
994
                  pTaskInfo);
995
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, forecastNext, NULL, destroyForecastInfo, optrDefaultBufFn,
22✔
996
                                         NULL, optrDefaultGetNextExtFn, NULL);
997

998
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
22✔
999
  QUERY_CHECK_CODE(code, lino, _error);
22!
1000

1001
  code = appendDownstream(pOperator, &downstream, 1);
22✔
1002
  QUERY_CHECK_CODE(code, lino, _error);
22!
1003

1004
  *pOptrInfo = pOperator;
22✔
1005

1006
  qDebug("%s forecast env is initialized, option:%s", pId, pSupp->algoOpt);
22!
1007
  return TSDB_CODE_SUCCESS;
22✔
1008

1009
_error:
1✔
1010
  if (code != TSDB_CODE_SUCCESS) {
1!
1011
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
1!
1012
  }
1013
  if (pInfo != NULL) destroyForecastInfo(pInfo);
1!
1014
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
1✔
1015
  pTaskInfo->code = code;
1✔
1016
  return code;
1✔
1017
}
1018

1019
static void destroyForecastInfo(void* param) {
23✔
1020
  SForecastOperatorInfo* pInfo = (SForecastOperatorInfo*)param;
23✔
1021

1022
  blockDataDestroy(pInfo->pRes);
23✔
1023
  pInfo->pRes = NULL;
23✔
1024

1025
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
23✔
1026
  pInfo->forecastSupp.pCovariateSlotList = NULL;
23✔
1027

1028
  cleanupExprSupp(&pInfo->scalarSup);
23✔
1029
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
23✔
1030
  taosMemoryFreeClear(param);
23!
1031
}
23✔
1032

1033
#else
1034

1035
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1036
                                   SOperatorInfo** pOptrInfo) {
1037
  return TSDB_CODE_OPS_NOT_SUPPORT;
1038
}
1039

1040
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc