• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4874

04 Dec 2025 01:55AM UTC coverage: 64.623% (+0.07%) from 64.558%
#4874

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

865 of 2219 new or added lines in 36 files covered. (38.98%)

6317 existing lines in 143 files now uncovered.

159543 of 246882 relevant lines covered (64.62%)

106415537.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/imputationoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "osMemPool.h"
21
#include "osMemory.h"
22
#include "querytask.h"
23
#include "tanalytics.h"
24
#include "taoserror.h"
25
#include "tcommon.h"
26
#include "tdatablock.h"
27
#include "tdef.h"
28
#include "thash.h"
29
#include "tjson.h"
30
#include "tmsg.h"
31
#include "types.h"
32

33
#ifdef USE_ANALYTICS
34

35
#define FREQ_STR     "freq"
36
#define RADIUS_STR   "radius"
37
#define LAGSTART_STR "lag_start"
38
#define LAGEND_STR   "lag_end"
39

40
typedef struct {
41
  int64_t      timeout;
42
  int8_t       wncheck;
43
  SAnalyticBuf analyBuf;
44
  int32_t      numOfRows;
45
  int32_t      numOfBlocks;
46
  int32_t      numOfCols;
47
  STimeWindow  win;
48
  uint64_t     groupId;
49
  SArray      *pBlocks;    // SSDataBlock*
50
} SBaseSupp;
51

52
typedef struct {
53
  SBaseSupp    base;
54
  int32_t      targetSlot;
55
  int32_t      targetType;
56
  int32_t      tsSlot;
57
  int32_t      tsPrecision;
58
  int32_t      resTsSlot;
59
  int32_t      resMarkSlot;
60
  char         freq[64];         // frequency of data
61
} SImputationSupp;
62

63
typedef struct {
64
  int32_t   radius;
65
  int32_t   lagStart;
66
  int32_t   lagEnd;
67
  SBaseSupp base;
68

69
  int32_t targetSlot1;
70
  int32_t targetSlot2;
71

72
  int32_t targetSlot1Type;
73
  int32_t targetSlot2Type;
74
} SCorrelationSupp;
75

76
typedef struct {
77
  SOptrBasicInfo   binfo;
78
  SExprSupp        scalarSup;
79
  char             algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
80
  char             algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
81
  char*            options;
82
  SColumn          targetCol;
83
  int32_t          resTargetSlot;
84
  int32_t          analysisType;
85
  SImputationSupp  imputatSup;
86
  SCorrelationSupp corrSupp;
87
} SAnalysisOperatorInfo;
88

89
static void    analysisDestroyOperatorInfo(void* param);
90
static int32_t imputationNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
91
static int32_t doAnalysis(SAnalysisOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
92
static int32_t doCacheBlock(SAnalysisOperatorInfo* pInfo, SSDataBlock* pBlock, const char* id);
93
static int32_t doParseInputForImputation(SAnalysisOperatorInfo* pInfo, SImputationSupp* pSupp, SNodeList* pFuncs, const char* id);
94
static int32_t doParseInputForDtw(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id);
95
static int32_t doParseInputForTlcc(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id);
96
static int32_t doSetResSlot(SAnalysisOperatorInfo* pInfo, SExprSupp* pExprSup);
97
static int32_t doParseOption(SAnalysisOperatorInfo* pInfo, const char* id);
98
static int32_t doCreateBuf(SAnalysisOperatorInfo* pInfo, const char* pId);
99
static int32_t estResultRowsAfterImputation(int32_t rows, int64_t skey, int64_t ekey, int32_t prec, const char* pFreq, const char* id);
100
static void    doInitImputOptions(SImputationSupp* pSupp);
101
static void    doInitDtwOptions(SCorrelationSupp* pSupp);
102
static int32_t parseFreq(SImputationSupp* pSupp, SHashObj* pHashMap, const char* id);
103
static void    parseRadius(SCorrelationSupp* pSupp, SHashObj* pHashMap, const char* id);
104

105
int32_t createGenericAnalysisOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
×
106
                                          SOperatorInfo** pOptrInfo) {
107
  QRY_PARAM_CHECK(pOptrInfo);
×
108

109
  int32_t                    code = TSDB_CODE_SUCCESS;
×
110
  int32_t                    lino = 0;
×
111
  size_t                     keyBufSize = 0;
×
112
  int32_t                    num = 0;
×
113
  SExprInfo*                 pExprInfo = NULL;
×
114
  int32_t                    numOfExprs = 0;
×
115
  const char*                id = GET_TASKID(pTaskInfo);
×
116
  SGenericAnalysisPhysiNode* pAnalysisNode = (SGenericAnalysisPhysiNode*)physiNode;
×
117
  SExprSupp*                 pExprSup = NULL;
×
118

119
  SAnalysisOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnalysisOperatorInfo));
×
120
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
121
  if (pOperator == NULL || pInfo == NULL) {
×
122
    code = terrno;
×
123
    goto _error;
×
124
  }
125

126
  pAnalysisNode = (SGenericAnalysisPhysiNode*)physiNode;
×
127
  pExprSup = &pOperator->exprSupp;
×
128

129
  code = createExprInfo(pAnalysisNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
×
130
  QUERY_CHECK_CODE(code, lino, _error);
×
131

132
  code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
×
133
  QUERY_CHECK_CODE(code, lino, _error);
×
134

135
  if (pAnalysisNode->pExprs != NULL) {
×
136
    SExprInfo* pScalarExprInfo = NULL;
×
137
    code = createExprInfo(pAnalysisNode->pExprs, NULL, &pScalarExprInfo, &num);
×
138
    QUERY_CHECK_CODE(code, lino, _error);
×
139

140
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
141
    QUERY_CHECK_CODE(code, lino, _error);
×
142
  }
143

144
  code = filterInitFromNode((SNode*)pAnalysisNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0, pTaskInfo->pStreamRuntimeInfo);
×
145
  QUERY_CHECK_CODE(code, lino, _error);
×
146

147
  for (int32_t i = 0; i < numOfExprs; ++i) {
×
148
    int32_t type = pExprInfo[i].pExpr->_function.functionType;
×
149
    if (type == FUNCTION_TYPE_IMPUTATION || type == FUNCTION_TYPE_DTW || type == FUNCTION_TYPE_DTW_PATH ||
×
150
        type == FUNCTION_TYPE_TLCC) {
151
      pInfo->analysisType = type;
×
152
      break;
×
153
    }
154
  }
155

156
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
157
    doInitImputOptions(&pInfo->imputatSup);
×
158
    code = doParseInputForImputation(pInfo, &pInfo->imputatSup, pAnalysisNode->pFuncs, id);
×
159
    QUERY_CHECK_CODE(code, lino, _error);
×
160
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH) {
×
161
    doInitDtwOptions(&pInfo->corrSupp);
×
162
    code = doParseInputForDtw(pInfo, &pInfo->corrSupp, pAnalysisNode->pFuncs, id);
×
163
    QUERY_CHECK_CODE(code, lino, _error);
×
164
  } else if (pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
165
    doInitDtwOptions(&pInfo->corrSupp);
×
166
    code = doParseInputForTlcc(pInfo, &pInfo->corrSupp, pAnalysisNode->pFuncs, id);
×
167
    QUERY_CHECK_CODE(code, lino, _error);
×
168
  } else {
169
    code = TSDB_CODE_INVALID_PARA;
×
170
    goto _error;
×
171
  }
172

173
  code = doParseOption(pInfo, id);
×
174
  QUERY_CHECK_CODE(code, lino, _error);
×
175

176
  code = doSetResSlot(pInfo, pExprSup);
×
177
  QUERY_CHECK_CODE(code, lino, _error);
×
178

179
  code = doCreateBuf(pInfo, id);
×
180
  QUERY_CHECK_CODE(code, lino, _error);
×
181

182
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
183

184
  pInfo->binfo.pRes = createDataBlockFromDescNode(physiNode->pOutputDataBlockDesc);
×
185
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
×
186

187
  setOperatorInfo(pOperator, "GenericAnalyOptr", QUERY_NODE_PHYSICAL_PLAN_ANALYSIS_FUNC, false, OP_NOT_OPENED,
×
188
                  pInfo, pTaskInfo);
189
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, imputationNext, NULL, analysisDestroyOperatorInfo,
×
190
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
191

192
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
×
193
  QUERY_CHECK_CODE(code, lino, _error);
×
194

195
  code = appendDownstream(pOperator, &downstream, 1);
×
196
  QUERY_CHECK_CODE(code, lino, _error);
×
197

198
  *pOptrInfo = pOperator;
×
199

200
  qDebug("%s forecast env is initialized, option:%s", id, pInfo->options);
×
201
  return TSDB_CODE_SUCCESS;
×
202

203
_error:
×
204
  if (code != TSDB_CODE_SUCCESS) {
×
205
    qError("%s %s failed at line %d since %s", id, __func__, lino, tstrerror(code));
×
206
  }
207

208
  if (pInfo != NULL) analysisDestroyOperatorInfo(pInfo);
×
209
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
210
  pTaskInfo->code = code;
×
211
  return code;
×
212
}
213

214
static int32_t getBaseSupp(SAnalysisOperatorInfo* pInfo, SBaseSupp** ppSupp, const char* pId) {
×
215
  int32_t code = 0;
×
216
  *ppSupp = NULL;
×
217

218
  switch (pInfo->analysisType) {
×
219
    case FUNCTION_TYPE_IMPUTATION:
×
220
      *ppSupp = &pInfo->imputatSup.base;
×
221
      break;
×
222
    case FUNCTION_TYPE_DTW:
×
223
    case FUNCTION_TYPE_DTW_PATH:
224
    case FUNCTION_TYPE_TLCC:
225
      *ppSupp = &pInfo->corrSupp.base;
×
226
      break;
×
227
    default:
×
228
      // Handle error: unknown analysis type
229
      code = TSDB_CODE_INVALID_PARA;
×
230
      qError("%s unknown analysis type: %d", pId, pInfo->analysisType);
×
231
  }
232

233
  return code;
×
234
}
235

236
static int32_t imputationNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
237
  int32_t                code = TSDB_CODE_SUCCESS;
×
238
  int32_t                lino = 0;
×
239
  SAnalysisOperatorInfo* pInfo = pOperator->info;
×
240
  SExecTaskInfo*         pTaskInfo = pOperator->pTaskInfo;
×
241
  SOptrBasicInfo*        pBInfo = &pInfo->binfo;
×
242
  SSDataBlock*           pRes = pInfo->binfo.pRes;
×
243
  int64_t                st = taosGetTimestampUs();
×
244
  const char*            idstr = GET_TASKID(pTaskInfo);
×
245
  SBaseSupp*             pSupp = NULL;
×
246

247
  code = getBaseSupp(pInfo, &pSupp, idstr);
×
248
  QUERY_CHECK_CODE(code, lino, _end);
×
249

250
  int32_t numOfBlocks = taosArrayGetSize(pSupp->pBlocks);
×
251
  blockDataCleanup(pRes);
×
252

253
  while (1) {
×
254
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
255
    if (pBlock == NULL) {
×
256
      break;
×
257
    }
258

259
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
×
260
      pSupp->groupId = pBlock->info.id.groupId;
×
261
      numOfBlocks++;
×
262
      code = doCacheBlock(pInfo, pBlock, idstr);
×
263

264
      qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%d", pSupp->groupId, numOfBlocks,
×
265
             pBlock->info.rows, pSupp->numOfRows);
266
      QUERY_CHECK_CODE(code, lino, _end);
×
267
    } else {
268
      qDebug("group:%" PRId64 ", read completed for new group coming, blocks:%d", pSupp->groupId, numOfBlocks);
×
269
      code = doAnalysis(pInfo, pTaskInfo);
×
270
      QUERY_CHECK_CODE(code, lino, _end);
×
271

272
      pSupp->groupId = pBlock->info.id.groupId;
×
273
      numOfBlocks = 1;
×
274
      qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%d", pSupp->groupId, pBlock->info.rows,
×
275
             pSupp->numOfRows);
276
      code = doCacheBlock(pInfo, pBlock, idstr);
×
277
      QUERY_CHECK_CODE(code, lino, _end);
×
278
    }
279

280
    if (pRes->info.rows > 0) {
×
281
      (*ppRes) = pRes;
×
282
      qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pRes->info.id.groupId, numOfBlocks);
×
283
      return code;
×
284
    }
285
  }
286

287
  if (numOfBlocks > 0) {
×
288
    qDebug("group:%" PRId64 ", read finish, blocks:%d", pSupp->groupId, numOfBlocks);
×
289
    code = doAnalysis(pInfo, pTaskInfo);
×
290
  }
291

292
  int64_t cost = taosGetTimestampUs() - st;
×
293
  qDebug("%s all groups finished, cost:%" PRId64 "us", idstr, cost);
×
294

295
_end:
×
296
  if (code != TSDB_CODE_SUCCESS) {
×
297
    qError("%s %s failed at line %d since %s", idstr, __func__, lino, tstrerror(code));
×
298
    pTaskInfo->code = code;
×
299
    T_LONG_JMP(pTaskInfo->env, code);
×
300
  }
301

302
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
×
303
  return code;
×
304
}
305

306
static void analysisDestroyOperatorInfo(void* param) {
×
307
  SAnalysisOperatorInfo* pInfo = (SAnalysisOperatorInfo*)param;
×
308
  if (pInfo == NULL) return;
×
309

310
  cleanupBasicInfo(&pInfo->binfo);
×
311
  cleanupExprSupp(&pInfo->scalarSup);
×
312

313
  SArray* pBlocks = NULL;
×
314
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
315
    pBlocks = pInfo->imputatSup.base.pBlocks;
×
316
  } else {
317
    pBlocks = pInfo->corrSupp.base.pBlocks;
×
318
  }
319

320
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
×
321
    SSDataBlock* pBlock = taosArrayGetP(pBlocks, i);
×
322
    blockDataDestroy(pBlock);
×
323
  }
324

325
  taosArrayDestroy(pBlocks);
×
326
  taosMemoryFreeClear(param);
×
327
}
328

329
static int32_t doCacheBlockForImputation(SImputationSupp* pSupp, const char* id, SSDataBlock* pBlock) {
×
330
  SAnalyticBuf* pBuf = &pSupp->base.analyBuf;
×
331
  int32_t code = 0;
×
332

333
  if (pSupp->base.numOfRows > ANALY_IMPUTATION_INPUT_MAX_ROWS) {
×
334
    qError("%s too many rows for imputation, maximum allowed:%d, input:%d", id, ANALY_IMPUTATION_INPUT_MAX_ROWS,
×
335
           pSupp->base.numOfRows);
336
    return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
337
  }
338

339
  pSupp->base.numOfBlocks++;
×
340
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->base.numOfBlocks, pBlock, pBlock->info.rows);
×
341

342
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
343
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->targetSlot);
×
344
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->tsSlot);
×
345
    if (pTsCol == NULL || pValCol == NULL) {
×
346
      break;
347
    }
348

349
    int32_t index = 0;
×
350

351
    int64_t ts = ((TSKEY*)pTsCol->pData)[j];
×
352
    char*   val = colDataGetData(pValCol, j);
×
353
    int16_t valType = pValCol->info.type;
×
354

355
    pSupp->base.win.skey = MIN(pSupp->base.win.skey, ts);
×
356
    pSupp->base.win.ekey = MAX(pSupp->base.win.ekey, ts);
×
357

358
    pSupp->base.numOfRows++;
×
359

360
    // write the primary time stamp column data
361
    code = taosAnalyBufWriteColData(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, &ts);
×
362
    if (TSDB_CODE_SUCCESS != code) {
×
363
      qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
×
364
      return code;
×
365
    }
366

367
    // write the main column for imputation
368
    code = taosAnalyBufWriteColData(pBuf, index++, valType, val);
×
369
    if (TSDB_CODE_SUCCESS != code) {
×
370
      qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
371
      return code;
×
372
    }
373
  }
374

375
  return code;
×
376
}
377

NEW
378
int32_t doCacheBlockForCorrelation(SCorrelationSupp* pSupp, const char* id, SSDataBlock* pBlock) {
×
379
  SAnalyticBuf* pBuf = &pSupp->base.analyBuf;
×
380
  int32_t code = 0;
×
381

382
  pSupp->base.numOfBlocks++;
×
383
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->base.numOfBlocks, pBlock, pBlock->info.rows);
×
384

385
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
386
    SColumnInfoData* pCol1 = taosArrayGet(pBlock->pDataBlock, pSupp->targetSlot1);
×
387
    SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, pSupp->targetSlot2);
×
388
    if (pCol1 == NULL || pCol2 == NULL) {
×
389
      break;
390
    }
391

392
    int32_t index = 0;
×
393

394
    char*   val1 = colDataGetData(pCol1, j);
×
395
    char*   val2 = colDataGetData(pCol2, j);
×
396

397
    pSupp->base.numOfRows++;
×
398

399
    // write the primary time stamp column data
400
    code = taosAnalyBufWriteColData(pBuf, index++, pCol1->info.type, val1);
×
401
    if (TSDB_CODE_SUCCESS != code) {
×
402
      qError("%s failed to write col1 in buf, code:%s", id, tstrerror(code));
×
403
      return code;
×
404
    }
405

406
    // write the main column for imputation
407
    code = taosAnalyBufWriteColData(pBuf, index++, pCol2->info.type, val2);
×
408
    if (TSDB_CODE_SUCCESS != code) {
×
409
      qError("%s failed to write col2 in buf, code:%s", id, tstrerror(code));
×
410
      return code;
×
411
    }
412
  }
413

NEW
414
  if (pSupp->base.numOfRows > ANALY_CORRELATION_INPUT_MAX_ROWS) {
×
NEW
415
    qError("%s too many rows for correlation, maximum allowed:%d, input:%d", id, ANALY_CORRELATION_INPUT_MAX_ROWS,
×
416
           pSupp->base.numOfRows);
NEW
417
    return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
418
  }
419

NEW
420
  return code;
×
421
}
422

UNCOV
423
static int32_t doCacheBlock(SAnalysisOperatorInfo* pInfo, SSDataBlock* pBlock, const char* id) {
×
NEW
424
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
425
  int32_t type = pInfo->analysisType;
×
426

NEW
427
  if (type == FUNCTION_TYPE_IMPUTATION) {
×
428
    code = doCacheBlockForImputation(&pInfo->imputatSup, id, pBlock);
×
NEW
429
  } else if (type == FUNCTION_TYPE_DTW || type == FUNCTION_TYPE_DTW_PATH || type == FUNCTION_TYPE_TLCC) {
×
NEW
430
    code = doCacheBlockForCorrelation(&pInfo->corrSupp, id, pBlock);
×
431
  }
432

433
  return code;
×
434
}
435

UNCOV
436
static int32_t finishBuildRequest(SAnalysisOperatorInfo* pInfo, SBaseSupp* pSupp, const char* id) {
×
UNCOV
437
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
438
  int32_t       code = 0;
×
439

440
  // let's check existed rows for imputation
441
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
442
    if (pSupp->numOfRows < ANALY_IMPUTATION_INPUT_MIN_ROWS) {
×
443
      qError("%s input rows for imputation aren't enough, min required:%d, current:%d", id, ANALY_IMPUTATION_INPUT_MIN_ROWS,
×
444
             pSupp->numOfRows);
UNCOV
445
      return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
446
    }
447

448
    code = estResultRowsAfterImputation(pSupp->numOfRows, pSupp->win.skey, pSupp->win.ekey, 
×
UNCOV
449
      pInfo->imputatSup.tsPrecision, pInfo->imputatSup.freq, id);
×
450
    if (code != 0) {
×
UNCOV
451
      return code;
×
452
    }
453
  }
454

455
  for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
456
    code = taosAnalyBufWriteColEnd(pBuf, i);
×
UNCOV
457
    if (code != 0) return code;
×
458
  }
459

460
  code = taosAnalyBufWriteDataEnd(pBuf);
×
461
  if (code != 0) return code;
×
462

UNCOV
463
  code = taosAnalyBufWriteOptStr(pBuf, "option", pInfo->options);
×
UNCOV
464
  if (code != 0) return code;
×
465

466
  code = taosAnalyBufWriteOptStr(pBuf, "algo", pInfo->algoName);
×
UNCOV
467
  if (code != 0) return code;
×
468

469
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
UNCOV
470
    const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
×
471
    int32_t p = pInfo->imputatSup.tsPrecision;
×
472

UNCOV
473
    switch (p) {
×
474
      case TSDB_TIME_PRECISION_MICRO:
×
475
        prec = TSDB_TIME_PRECISION_MICRO_STR;
×
476
        break;
×
UNCOV
477
      case TSDB_TIME_PRECISION_NANO:
×
478
        prec = TSDB_TIME_PRECISION_NANO_STR;
×
479
        break;
×
480
      case TSDB_TIME_PRECISION_SECONDS:
×
481
        prec = "s";
×
482
        break;
×
483
      default:
×
484
        prec = TSDB_TIME_PRECISION_MILLI_STR;
×
485
    }
486

487
    code = taosAnalyBufWriteOptStr(pBuf, "freq", pInfo->imputatSup.freq);
×
488
    if (code != 0) return code;
×
489

UNCOV
490
    code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
×
UNCOV
491
    if (code != 0) return code;
×
492
  }
493

UNCOV
494
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_WNCHECK_NAME, pSupp->wncheck);
×
495
  if (code != 0) return code;
×
496

UNCOV
497
  code = taosAnalyBufClose(pBuf);
×
UNCOV
498
  return code;
×
499
}
500

UNCOV
501
static int32_t buildDtwPathResult(SJson* pathJson, SColumnInfoData* pResTargetCol, const char* pId, char* buf,
×
502
                                  int32_t bufSize) {
503
  int32_t pair = tjsonGetArraySize(pathJson);
×
UNCOV
504
  if (pair != 2) {
×
UNCOV
505
    qError("%s invalid path data, should be array of pair", pId);
×
506
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
507
  }
508

509
  SJson* first = tjsonGetArrayItem(pathJson, 0);
×
510
  SJson* second = tjsonGetArrayItem(pathJson, 1);
×
511

UNCOV
512
  int64_t t1, t2;
×
UNCOV
513
  tjsonGetObjectValueBigInt(first, &t1);
×
514
  tjsonGetObjectValueBigInt(second, &t2);
×
515

UNCOV
516
  int32_t n = snprintf(varDataVal(buf), bufSize - VARSTR_HEADER_SIZE, "(%" PRId64 ", %" PRId64 ")", t1, t2);
×
517
  if (n > 0) {
×
518
    varDataSetLen(buf, n);
×
519
    return TSDB_CODE_SUCCESS;
×
520
  } else {
521
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
522
  }
523
}
524

UNCOV
525
static int32_t doAnalysisImpl(SAnalysisOperatorInfo* pInfo, SBaseSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
×
526
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
UNCOV
527
  int32_t       resCurRow = pBlock->info.rows;
×
UNCOV
528
  int64_t       tmpI64 = 0;
×
UNCOV
529
  double        tmpDouble = 0;
×
530
  int32_t       code = 0;
×
531

532
  SColumnInfoData* pResTargetCol = taosArrayGet(pBlock->pDataBlock, pInfo->resTargetSlot);
×
533
  if (NULL == pResTargetCol) {
×
534
    return terrno;
×
535
  }
536

537
  SJson* pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout, pId);
×
538
  if (pJson == NULL) {
×
539
    return terrno;
×
540
  }
541

542
  int32_t rows = 0;
×
543
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
×
544
  if (rows < 0 && code == 0) {
×
UNCOV
545
    code = parseErrorMsgFromAnalyticServer(pJson, pId);
×
UNCOV
546
    tjsonDelete(pJson);
×
547
    return code;
×
548
  }
549

550
  if (code < 0) {
×
551
    goto _OVER;
×
552
  }
553

UNCOV
554
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
555
    SJson* pTarget = tjsonGetObjectItem(pJson, "target");
×
556
    if (pTarget == NULL) goto _OVER;
×
557

UNCOV
558
    SJson* pTsList = tjsonGetObjectItem(pJson, "ts");
×
559
    if (pTsList == NULL) goto _OVER;
×
560

561
    SJson* pMask = tjsonGetObjectItem(pJson, "mask");
×
UNCOV
562
    if (pMask == NULL) goto _OVER;
×
563

564
    int32_t listLen = tjsonGetArraySize(pTarget);
×
UNCOV
565
    if (listLen != rows) {
×
566
      goto _OVER;
×
567
    }
568

569
    if (pInfo->imputatSup.resTsSlot != -1) {
×
570
      SColumnInfoData* pResTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->imputatSup.resTsSlot);
×
571
      if (pResTsCol != NULL) {
×
UNCOV
572
        for (int32_t i = 0; i < rows; ++i) {
×
UNCOV
573
          SJson* tsJson = tjsonGetArrayItem(pTsList, i);
×
574
          tjsonGetObjectValueBigInt(tsJson, &tmpI64);
×
575
          colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
×
576
          resCurRow++;
×
577
        }
578
      }
579
    }
580

581
    resCurRow = pBlock->info.rows;
×
UNCOV
582
    if (pResTargetCol->info.type == TSDB_DATA_TYPE_DOUBLE) {
×
UNCOV
583
      for (int32_t i = 0; i < rows; ++i) {
×
UNCOV
584
        SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
UNCOV
585
        tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
586
        colDataSetDouble(pResTargetCol, resCurRow, &tmpDouble);
×
587
        resCurRow++;
×
588
      }
589
    } else if (pResTargetCol->info.type == TSDB_DATA_TYPE_INT) {
×
590
      for (int32_t i = 0; i < rows; ++i) {
×
591
        SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
592
        tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
UNCOV
593
        int32_t t = tmpDouble;
×
594
        colDataSetInt32(pResTargetCol, resCurRow, &t);
×
595
        resCurRow++;
×
596
      }
597
    } else if (pResTargetCol->info.type == TSDB_DATA_TYPE_FLOAT) {
×
598
      for (int32_t i = 0; i < rows; ++i) {
×
599
        SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
600
        tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
UNCOV
601
        float t = tmpDouble;
×
602
        colDataSetFloat(pResTargetCol, resCurRow, &t);
×
603
        resCurRow++;
×
604
      }
605
    }
606

607
    if (pInfo->imputatSup.resMarkSlot != -1) {
×
608
      SColumnInfoData* pResMarkCol = taosArrayGet(pBlock->pDataBlock, pInfo->imputatSup.resMarkSlot);
×
UNCOV
609
      if (pResMarkCol != NULL) {
×
UNCOV
610
        resCurRow = pBlock->info.rows;
×
UNCOV
611
        for (int32_t i = 0; i < rows; ++i) {
×
612
          SJson* markJson = tjsonGetArrayItem(pMask, i);
×
613
          tjsonGetObjectValueBigInt(markJson, &tmpI64);
×
614
          int32_t v = tmpI64;
×
615
          colDataSetInt32(pResMarkCol, resCurRow, &v);
×
616
          resCurRow++;
×
617
        }
618
      }
619
    }
620
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW) {
×
621
    // dtw result example:
622
    // {'option': 'algo=dtw,radius=1', 'rows': 4, 'distance': 1.6, 'path': [(0, 0), (1, 0), (2, 0), (3, 1), (3, 2), (3, 3)]}
623

UNCOV
624
    SJson* pTarget = tjsonGetObjectItem(pJson, "distance");
×
625
    if (pTarget == NULL) goto _OVER;
×
626

UNCOV
627
    tjsonGetObjectValueDouble(pTarget, &tmpDouble);
×
UNCOV
628
    colDataSetDouble(pResTargetCol, resCurRow, &tmpDouble);
×
629
    rows = 1;
×
630
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW_PATH) {
×
631
    // dtw path results are the same as above
632
    SJson* pPath = tjsonGetObjectItem(pJson, "path");
×
633
    if (pPath == NULL) goto _OVER;
×
634

635
    int32_t listLen = tjsonGetArraySize(pPath);
×
UNCOV
636
    if (listLen != rows) {
×
637
      goto _OVER;
×
638
    }
639

640
    for (int32_t i = 0; i < rows; ++i) {
×
641
      SJson* pathJson = tjsonGetArrayItem(pPath, i);
×
642

UNCOV
643
      char buf[128 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
644
      code = buildDtwPathResult(pathJson, pResTargetCol, pId, buf, sizeof(buf));
×
645
      if (code != 0) {
×
646
        qError("%s failed to build path result, code:%s", pId, tstrerror(code));
×
UNCOV
647
        goto _OVER;
×
648
      }
649

650
      code = colDataSetVal(pResTargetCol, resCurRow, buf, false);
×
651
      if (code != 0) {
×
652
        qError("%s failed to set path result to column, code:%s", pId, tstrerror(code));
×
UNCOV
653
        goto _OVER;
×
654
      }
655

656
      resCurRow++;
×
657
    }
658
  } else if (pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
659
    // tlcc result example:
660
    // {'option': 'algo=tlcc,lag_start=-1,lag_end=1', 'rows': 3, 'lags': [-1, 0, 1], 'ccf_vals': [-0.24, 0.9, -0.2]}
661
    SJson* pLags = tjsonGetObjectItem(pJson, "lags");
×
UNCOV
662
    if (pLags == NULL) goto _OVER;
×
663

UNCOV
664
    SJson* pCcfVals = tjsonGetObjectItem(pJson, "ccf_vals");
×
UNCOV
665
    if (pCcfVals == NULL) goto _OVER;
×
666

667
    for (int32_t i = 0; i < rows; ++i) {
×
UNCOV
668
      SJson* ccfValJson = tjsonGetArrayItem(pCcfVals, i);
×
669
      tjsonGetObjectValueDouble(ccfValJson, &tmpDouble);
×
670

UNCOV
671
      int64_t index = 0;
×
672
      SJson*  pLastIndexJson = tjsonGetArrayItem(pLags, i);
×
673
      tjsonGetObjectValueBigInt(pLastIndexJson, &index);
×
674

UNCOV
675
      char    buf[128 + VARSTR_HEADER_SIZE] = {0};
×
676
      int32_t n = snprintf(varDataVal(buf), tListLen(buf) - VARSTR_HEADER_SIZE, "(%" PRId64 ", %.4f)", index, tmpDouble);
×
677
      if (n > 0) {
×
678
        varDataSetLen(buf, n);
×
679
      } else {
680
        return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
681
      }
682

683
      code = colDataSetVal(pResTargetCol, resCurRow, buf, false);
×
UNCOV
684
      resCurRow++;
×
685
    }
686

UNCOV
687
    tjsonGetObjectValueDouble(pLags, &tmpDouble);
×
688
    colDataSetDouble(pResTargetCol, resCurRow, &tmpDouble);
×
689
  }
690

UNCOV
691
  pBlock->info.rows += rows;
×
692

693
  if (pJson != NULL) tjsonDelete(pJson);
×
UNCOV
694
  return 0;
×
695

696
_OVER:
×
UNCOV
697
  tjsonDelete(pJson);
×
698
  if (code == 0) {
×
699
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
700
  }
701

702
  qError("%s failed to perform analysis finalize since %s", pId, tstrerror(code));
×
703
  return code;
×
704
}
705

UNCOV
706
static int32_t doAnalysis(SAnalysisOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
×
707
  int32_t                  code = TSDB_CODE_SUCCESS;
×
708
  int32_t                  lino = 0;
×
UNCOV
709
  char*                    id = GET_TASKID(pTaskInfo);
×
UNCOV
710
  SSDataBlock*             pRes = pInfo->binfo.pRes;
×
711

712
  SBaseSupp* pSupp = (pInfo->analysisType==FUNCTION_TYPE_IMPUTATION)? &pInfo->imputatSup.base:&pInfo->corrSupp.base;
×
713

714
  if (pSupp->numOfRows <= 0) {
×
715
    taosArrayClear(pSupp->pBlocks);
×
UNCOV
716
    pSupp->numOfRows = 0;
×
717
    return code;
×
718
  }
719

720
  qDebug("%s group:%" PRId64 ", do analysis, rows:%d", id, pSupp->groupId, pSupp->numOfRows);
×
721
  pRes->info.id.groupId = pSupp->groupId;
×
722

UNCOV
723
  code = finishBuildRequest(pInfo, pSupp, id);
×
UNCOV
724
  QUERY_CHECK_CODE(code, lino, _end);
×
725

726
  //   if (pBlock->info.rows < pBlock->info.capacity) {
727
  //   return TSDB_CODE_SUCCESS;
728
  // }
729

730
  // code = blockDataEnsureCapacity(pRes, newRowsNum);
731
  // if (code != TSDB_CODE_SUCCESS) {
732
  //   qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
733
  //   return code;
734
  // }
735

736
  // QUERY_CHECK_CODE(code, lino, _end);
737

UNCOV
738
  code = doAnalysisImpl(pInfo, pSupp, pRes, id);
×
UNCOV
739
  QUERY_CHECK_CODE(code, lino, _end);
×
740

UNCOV
741
  uInfo("%s block:%d, analysis finalize", id, pSupp->numOfBlocks);
×
742

743
_end:
×
744
  pSupp->numOfBlocks = 0;
×
UNCOV
745
  taosAnalyBufDestroy(&pSupp->analyBuf);
×
746
  return code;
×
747
}
748

749
static int32_t doParseInputForImputation(SAnalysisOperatorInfo* pInfo, SImputationSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
750
  int32_t code = 0;
×
751
  SNode*  pNode = NULL;
×
752

UNCOV
753
  FOREACH(pNode, pFuncs) {
×
754
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
755
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
756
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
757

758
      if (pFunc->funcType == FUNCTION_TYPE_IMPUTATION) {
×
759
        // code = validInputParams(pFunc, id);
760
        // if (code) {
761
          // return code;
762
        // }
763

UNCOV
764
        pSupp->base.numOfCols = 2;
×
765

UNCOV
766
        if (numOfParam == 2) {
×
767
          // column, ts
UNCOV
768
          SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
769
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
770

771
          pSupp->tsSlot = pTsNode->slotId;
×
UNCOV
772
          pSupp->tsPrecision = pTsNode->node.resType.precision;
×
773
          pSupp->targetSlot = pTargetNode->slotId;
×
774
          pSupp->targetType = pTargetNode->node.resType.type;
×
775

776
          // let's set the moment as the default imputation algorithm
777
          pInfo->options = taosStrdup("algo=moment");
×
778
        } else {
779
          // column, options, ts
UNCOV
780
          SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
UNCOV
781
          if (nodeType(pTargetNode) != QUERY_NODE_COLUMN) {
×
782
            // return error
783
          }
784

785
          bool assignTs = false;
×
786
          bool assignOpt = false;
×
787

UNCOV
788
          pSupp->targetSlot = pTargetNode->slotId;
×
UNCOV
789
          pSupp->targetType = pTargetNode->node.resType.type;
×
790

791
          for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
×
UNCOV
792
            SNode* pNode = nodesListGetNode(pFunc->pParameterList, i);
×
793
            if (nodeType(pNode) == QUERY_NODE_COLUMN) {
×
794
              SColumnNode* pColNode = (SColumnNode*)pNode;
×
UNCOV
795
              if (pColNode->isPrimTs && (!assignTs)) {
×
796
                pSupp->tsSlot = pColNode->slotId;
×
797
                pSupp->tsPrecision = pColNode->node.resType.precision;
×
798
                assignTs = true;
×
799
                continue;
×
800
              }
801
            } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
802
              if (!assignOpt) {
×
803
                SValueNode* pOptNode = (SValueNode*)pNode;
×
804
                pInfo->options = taosStrdup(pOptNode->literal);
×
UNCOV
805
                assignOpt = true;
×
806
                continue;
×
807
              }
808
            }
809
          }
810

811
          if (!assignOpt) {
×
UNCOV
812
            qError("%s option is missing, failed to do imputation", id);
×
UNCOV
813
            code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
814
          }
815
        }
816
      }
817
    }
818
  }
819

UNCOV
820
  if (pInfo->options == NULL) {
×
UNCOV
821
    qError("%s option is missing or clone option string failed, failed to do imputation", id);
×
UNCOV
822
    code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
823
  }
824

825
  return code;
×
826
}
827

UNCOV
828
int32_t doParseInputForDtw(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
UNCOV
829
  int32_t code = 0;
×
830
  SNode*  pNode = NULL;
×
831

UNCOV
832
  FOREACH(pNode, pFuncs) {
×
833
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
834
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
835
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
836

837
      if (pFunc->funcType == FUNCTION_TYPE_DTW || pFunc->funcType == FUNCTION_TYPE_DTW_PATH) {
×
838
        // code = validInputParams(pFunc, id);
839
        // if (code) {
840
          // return code;
841
        // }
842

UNCOV
843
        pSupp->base.numOfCols = 2;
×
844

UNCOV
845
        if (numOfParam == 2) {
×
846
          // column1, column2
UNCOV
847
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
848
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
849

850
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
UNCOV
851
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
852

853
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
UNCOV
854
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
855

856
          // let's set the default radius to be 1
UNCOV
857
          pInfo->options = taosStrdup("algo=dtw,radius=1");
×
858
        } else if (numOfParam == 3) {
×
859
          // column, options, ts
860

UNCOV
861
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
862
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
863

UNCOV
864
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
UNCOV
865
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
866

867
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
UNCOV
868
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
869

870
          SValueNode* pOptNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
×
871

872
          int32_t bufLen = strlen(pOptNode->literal) + 30;
×
873
          pInfo->options = taosMemoryMalloc(bufLen);
×
UNCOV
874
          if (pInfo->options == NULL) {
×
875
            code = terrno;
×
UNCOV
876
            qError("%s failed to prepare option buffer, code:%s", id, tstrerror(code));
×
877
            return code;
×
878
          }
879

880
          int32_t ret = snprintf(pInfo->options, bufLen, "%s,%s", pOptNode->literal, "algo=dtw");
×
881
          if (ret < 0 || ret >= bufLen) {
×
882
            code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
883
            qError("%s failed to clone options string, code:%s", id, tstrerror(code));
×
UNCOV
884
            return code;
×
885
          }
886
        } else {
887
          qError("%s too many parameters in dtw function", id);
×
888
          code = TSDB_CODE_INVALID_PARA;
×
889
          return code;
×
890
        }
891
      }
892
    }
893
  }
894

UNCOV
895
  if (pInfo->options == NULL) {
×
UNCOV
896
    qError("%s option is missing or clone option string failed, failed to do correlation analysis", id);
×
UNCOV
897
    code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
898
  }
899

900
  return code;
×
901
}
902

UNCOV
903
static int32_t doParseInputForTlcc(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
UNCOV
904
  int32_t code = 0;
×
905
  SNode*  pNode = NULL;
×
906

UNCOV
907
  FOREACH(pNode, pFuncs) {
×
908
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
909
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
910
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
911

912
      if (pFunc->funcType == FUNCTION_TYPE_TLCC) {
×
913
        // code = validInputParams(pFunc, id);
914
        // if (code) {
915
          // return code;
916
        // }
917

UNCOV
918
        pSupp->base.numOfCols = 2;
×
919

UNCOV
920
        if (numOfParam == 2) {
×
921
          // column1, column2
UNCOV
922
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
923
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
924

925
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
UNCOV
926
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
927

928
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
UNCOV
929
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
930

931
          // let's set the default radius to be 1
UNCOV
932
          pInfo->options = taosStrdup("algo=tlcc,lag_start=-1,lag_end=1");
×
933
        } else if (numOfParam == 3) {
×
934
          // column, options, ts
935
          // SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
936
          // if (nodeType(pTargetNode) != QUERY_NODE_COLUMN) {
937
          // return error
938
          // }
939

UNCOV
940
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
UNCOV
941
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
942

UNCOV
943
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
UNCOV
944
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
945

946
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
UNCOV
947
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
948

949
          SValueNode* pOptNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
×
950

951
          int32_t bufLen = strlen(pOptNode->literal) + 30;
×
952
          pInfo->options = taosMemoryMalloc(bufLen);
×
UNCOV
953
          if (pInfo->options == NULL) {
×
954
            code = terrno;
×
UNCOV
955
            qError("%s failed to prepare option buffer, code:%s", id, tstrerror(code));
×
956
            return code;
×
957
          }
958

959
          int32_t ret = snprintf(pInfo->options, bufLen, "%s,%s", pOptNode->literal, "algo=tlcc");
×
960
          if (ret < 0 || ret >= bufLen) {
×
961
            code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
962
            qError("%s failed to clone options string, code:%s", id, tstrerror(code));
×
UNCOV
963
            return code;
×
964
          }
965
        } else {
966
          qError("%s too many parameters in tlcc function", id);
×
967
          code = TSDB_CODE_INVALID_PARA;
×
968
          return code;
×
969
        }
970
      }
971
    }
972
  }
973

UNCOV
974
  if (pInfo->options == NULL) {
×
UNCOV
975
    qError("%s option is missing or clone option string failed, failed to do correlation analysis", id);
×
UNCOV
976
    code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
977
  }
978

979
  return code;
×
980
}
981

UNCOV
982
static int32_t doSetResSlot(SAnalysisOperatorInfo* pInfo, SExprSupp* pExprSup) {
×
UNCOV
983
  pInfo->imputatSup.resTsSlot = -1;
×
984
  pInfo->resTargetSlot = -1;
×
UNCOV
985
  pInfo->imputatSup.resMarkSlot = -1;
×
986

987
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
×
988
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
×
989
    int32_t    dstSlot = pExprInfo->base.resSchema.slotId;
×
990
    int32_t    functionType = pExprInfo->pExpr->_function.functionType;
×
UNCOV
991
    if (functionType == FUNCTION_TYPE_IMPUTATION || functionType == FUNCTION_TYPE_DTW ||
×
992
        functionType == FUNCTION_TYPE_DTW_PATH || functionType == FUNCTION_TYPE_TLCC) {
×
993
      pInfo->resTargetSlot = dstSlot;
×
994
    } else if (functionType == FUNCTION_TYPE_IMPUTATION_ROWTS) {
×
995
      pInfo->imputatSup.resTsSlot = dstSlot;
×
996
    } else if (functionType == FUNCTION_TYPE_IMPUTATION_MARK) {
×
997
      pInfo->imputatSup.resMarkSlot = dstSlot;
×
998
    }
999
  }
1000

1001
  return 0;
×
1002
}
1003

UNCOV
1004
void doInitBaseOptions(SBaseSupp* pSupp) {
×
UNCOV
1005
  pSupp->numOfCols = 0;
×
1006
  pSupp->numOfRows = 0;
×
UNCOV
1007
  pSupp->numOfBlocks = 0;
×
UNCOV
1008
  pSupp->wncheck = ANALY_DEFAULT_WNCHECK;
×
1009
  pSupp->timeout = ANALY_DEFAULT_TIMEOUT;
×
1010
  pSupp->groupId = 0;
×
1011

1012
  pSupp->win.skey = INT64_MAX;
×
1013
  pSupp->win.ekey = INT64_MIN;
×
1014
}
×
1015

UNCOV
1016
void doInitImputOptions(SImputationSupp* pSupp) {
×
1017
  doInitBaseOptions(&pSupp->base);
×
1018

1019
  pSupp->freq[0] = 'S';  // d(day), h(hour), m(minute),s(second), ms(millisecond), us(microsecond)
×
UNCOV
1020
  pSupp->freq[1] = '\0';
×
1021

1022
  pSupp->tsSlot = -1;
×
UNCOV
1023
  pSupp->targetSlot = -1;
×
1024
  pSupp->targetType = -1;
×
1025
  pSupp->tsPrecision = -1;
×
UNCOV
1026
}
×
1027

1028
void doInitDtwOptions(SCorrelationSupp* pSupp) {
×
1029
  doInitBaseOptions(&pSupp->base);
×
1030
  pSupp->radius = 1;
×
1031

UNCOV
1032
  pSupp->lagStart = -1;
×
1033
  pSupp->lagEnd = 1;
×
1034

1035
  pSupp->targetSlot1 = -1;
×
UNCOV
1036
  pSupp->targetSlot1Type = TSDB_DATA_TYPE_INT;
×
1037
  pSupp->targetSlot2 = -1;
×
1038
  pSupp->targetSlot2Type = TSDB_DATA_TYPE_INT;
×
UNCOV
1039
}
×
1040

1041
int32_t parseFreq(SImputationSupp* pSupp, SHashObj* pHashMap, const char* id) {
×
1042
  int32_t code = 0;
×
1043
  char*   p = NULL;
×
1044
  int32_t len = 0;
×
UNCOV
1045
  regex_t regex;
×
1046

1047
  char* pFreq = taosHashGet(pHashMap, FREQ_STR, strlen(FREQ_STR));
×
1048
  if (pFreq != NULL) {
×
1049
    len = taosHashGetValueSize(pFreq);
×
1050
    p = taosStrndupi(pFreq, len);
×
UNCOV
1051
    if (p == NULL) {
×
1052
      qError("%s failed to clone the freq param, code:%s", id, strerror(terrno));
×
1053
      return terrno;
×
1054
    }
1055

1056
    if (regcomp(&regex, "^([1-9][0-9]*|[1-9]*)(ms|us|ns|[smhdw])$", REG_EXTENDED | REG_ICASE) != 0) {
×
1057
      qError("%s failed to compile regex for freq param", id);
×
1058
      return TSDB_CODE_INVALID_PARA;
×
1059
    }
1060

1061
    int32_t res = regexec(&regex, p, 0, NULL, 0);
×
1062
    regfree(&regex);
×
1063
    if (res != 0) {
×
UNCOV
1064
      qError("%s invalid freq parameter: %s", id, p);
×
UNCOV
1065
      taosMemoryFreeClear(p);
×
1066
      return TSDB_CODE_INVALID_PARA;
×
1067
    }
1068

1069
    if (code == TSDB_CODE_SUCCESS) {
×
1070
      tstrncpy(pSupp->freq, pFreq, len + 1);
×
1071
      qDebug("%s data freq:%s", id, pSupp->freq);
×
1072
    }
1073
  } else {
1074
    qDebug("%s not specify data freq, default: %s", id, pSupp->freq);
×
1075
  }
1076

UNCOV
1077
  taosMemoryFreeClear(p);
×
UNCOV
1078
  return code;
×
1079
}
1080

UNCOV
1081
void parseRadius(SCorrelationSupp* pSupp, SHashObj* pHashMap, const char* id) {
×
1082
  char* pRadius = taosHashGet(pHashMap, RADIUS_STR, strlen(RADIUS_STR));
×
1083
  if (pRadius != NULL) {
×
UNCOV
1084
    pSupp->radius = *(int32_t*)pRadius;
×
UNCOV
1085
    qDebug("%s dtw search radius:%d", id, pSupp->radius);
×
1086
  } else {
1087
    qDebug("%s not specify search radius, default: %d", id, pSupp->radius);
×
1088
  }
1089
}
×
1090

UNCOV
1091
void parseLag(SCorrelationSupp* pSupp, SHashObj* pHashMap, const char* id) {
×
1092
  char* pLagStart = taosHashGet(pHashMap, LAGSTART_STR, strlen(LAGSTART_STR));
×
UNCOV
1093
  if (pLagStart != NULL) {
×
1094
    pSupp->lagStart = *(int32_t*)pLagStart;
×
UNCOV
1095
    qDebug("%s tlcc lag start:%d", id, pSupp->lagStart);
×
1096
  } else {
1097
    qDebug("%s not specify tlcc lag start, default: %d", id, pSupp->lagStart);
×
1098
  }
1099

1100
  char* pLagEnd = taosHashGet(pHashMap, LAGEND_STR, strlen(LAGEND_STR));
×
UNCOV
1101
  if (pLagEnd != NULL) {
×
1102
    pSupp->lagEnd = *(int32_t*)pLagEnd;
×
UNCOV
1103
    qDebug("%s tlcc lag end:%d", id, pSupp->lagEnd);
×
1104
  } else {
1105
    qDebug("%s not specify tlcc lag end, default: %d", id, pSupp->lagEnd);
×
1106
  }
1107
}
×
1108

UNCOV
1109
int32_t estResultRowsAfterImputation(int32_t rows, int64_t skey, int64_t ekey, int32_t prec, const char* pFreq, const char* id) {
×
1110
  int64_t range = ekey - skey;
×
UNCOV
1111
  double  factor = 0;
×
1112
  if (prec == TSDB_TIME_PRECISION_MILLI) {
×
UNCOV
1113
    if (strcmp(pFreq, "h") == 0) {
×
1114
      factor = 0.001 * 1/3600;
×
1115
    } else if (strcmp(pFreq, "m") == 0) {
×
1116
      factor = 0.001 * 1/60;
×
1117
    } else if (strcmp(pFreq, "s") == 0) {
×
1118
      factor = 0.001;
×
1119
    } else if (strcmp(pFreq, "ms") == 0) {
×
1120
      factor = 1;
×
1121
    } else if (strcmp(pFreq, "us") == 0) {
×
1122
      factor *= 1000;
×
1123
    } else if (strcmp(pFreq, "ns") == 0) {
×
1124
      factor *= 1000000;
×
1125
    }
1126

1127
    int64_t num = range * factor - rows;
×
1128
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
1129
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
UNCOV
1130
      return TSDB_CODE_INVALID_PARA;
×
1131
    }
1132
  } else if (prec == TSDB_TIME_PRECISION_MICRO) {
×
1133
    if (strcmp(pFreq, "h") == 0) {
×
1134
      factor = 0.000001 * 1/3600;
×
1135
    } else if (strcmp(pFreq, "m") == 0) {
×
UNCOV
1136
      factor = 0.000001 * 1/60;
×
1137
    } else if (strcmp(pFreq, "s") == 0) {
×
1138
      factor = 0.000001;
×
1139
    } else if (strcmp(pFreq, "ms") == 0) {
×
1140
      factor = 1000;
×
1141
    } else if (strcmp(pFreq, "us") == 0) {
×
1142
      factor *= 1;
×
1143
    } else if (strcmp(pFreq, "ns") == 0) {
×
1144
      factor *= 1000;
×
1145
    }
1146

1147
    int64_t num = range * factor - rows;
×
1148
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
1149
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
UNCOV
1150
      return TSDB_CODE_INVALID_PARA;
×
1151
    }
1152
  } else if (prec == TSDB_TIME_PRECISION_NANO) {
×
1153
    if (strcmp(pFreq, "h") == 0) {
×
1154
      factor = 0.000000001 * 1/3600;
×
1155
    } else if (strcmp(pFreq, "m") == 0) {
×
UNCOV
1156
      factor = 0.000000001 * 1/60;
×
1157
    } else if (strcmp(pFreq, "s") == 0) {
×
1158
      factor = 0.000000001;
×
1159
    } else if (strcmp(pFreq, "ms") == 0) {
×
1160
      factor = 1000000;
×
1161
    } else if (strcmp(pFreq, "us") == 0) {
×
1162
      factor *= 1000;
×
1163
    } else if (strcmp(pFreq, "ns") == 0) {
×
1164
      factor *= 1;
×
1165
    }
1166

1167
    int64_t num = range * factor - rows;
×
1168
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
1169
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
UNCOV
1170
      return TSDB_CODE_INVALID_PARA;
×
1171
    }
1172
  }
1173

1174
  return TSDB_CODE_SUCCESS;
×
1175
}
1176

UNCOV
1177
int32_t doParseOption(SAnalysisOperatorInfo* pInfo, const char* id) {
×
UNCOV
1178
  int32_t   code = 0;
×
1179
  int32_t   lino = 0;
×
UNCOV
1180
  SHashObj* pHashMap = NULL;
×
1181

1182
  code = taosAnalyGetOpts(pInfo->options, &pHashMap);
×
1183
  if (code != TSDB_CODE_SUCCESS) {
×
1184
    return code;
×
1185
  }
1186

1187
  int32_t type = 0;
×
1188
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
1189
    type = ANALY_ALGO_TYPE_IMPUTATION;
×
UNCOV
1190
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH ||
×
UNCOV
1191
             pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
1192
    type = ANALY_ALGO_TYPE_CORREL;
×
1193
  }
1194

1195
  code = taosAnalysisParseAlgo(pInfo->options, pInfo->algoName, pInfo->algoUrl, type, tListLen(pInfo->algoUrl),
×
1196
                               pHashMap, id);
1197
  TSDB_CHECK_CODE(code, lino, _end);
×
1198

1199
  // extract the timeout parameter
1200
  SBaseSupp* pSupp =
×
UNCOV
1201
      (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) ? &pInfo->imputatSup.base : &pInfo->corrSupp.base;
×
1202
  pSupp->timeout = taosAnalysisParseTimout(pHashMap, id);
×
UNCOV
1203
  pSupp->wncheck = taosAnalysisParseWncheck(pHashMap, id);
×
1204

1205
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
1206
    // extract data freq
1207
    code = parseFreq(&pInfo->imputatSup, pHashMap, id);
×
1208
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH) {
×
UNCOV
1209
    parseRadius(&pInfo->corrSupp, pHashMap, id);
×
1210
  } else if (pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
UNCOV
1211
    parseLag(&pInfo->corrSupp, pHashMap, id);
×
1212
  }
1213

1214
_end:
×
1215
  taosHashCleanup(pHashMap);
×
1216
  return code;
×
1217
}
1218

1219
static int32_t doCreateBuf(SAnalysisOperatorInfo* pInfo, const char* pId) {
×
1220
  SBaseSupp* pSupp =
×
1221
      (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) ? &pInfo->imputatSup.base : &pInfo->corrSupp.base;
×
1222

UNCOV
1223
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
1224
  int64_t       ts = taosGetTimestampNs();
×
1225
  int32_t       index = 0;
×
1226

UNCOV
1227
  pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
×
1228
  snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-analysis-%p-%" PRId64, tsTempDir, pInfo, ts);
×
1229

1230
  int32_t code = tsosAnalyBufOpen(pBuf, pSupp->numOfCols, pId);
×
UNCOV
1231
  if (code != 0) goto _OVER;
×
1232

1233
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
UNCOV
1234
    code = taosAnalyBufWriteColMeta(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, "ts");
×
1235
    if (code != 0) goto _OVER;
×
1236

UNCOV
1237
    code = taosAnalyBufWriteColMeta(pBuf, index++, pInfo->imputatSup.targetType, "val");
×
1238
    if (code != 0) goto _OVER;
×
1239
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH || pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
1240
    code = taosAnalyBufWriteColMeta(pBuf, index++, pInfo->corrSupp.targetSlot1Type, "val");
×
UNCOV
1241
    if (code != 0) goto _OVER;
×
1242

1243
    code = taosAnalyBufWriteColMeta(pBuf, index++, pInfo->corrSupp.targetSlot2Type, "val1");
×
1244
    if (code != 0) goto _OVER;
×
1245
  }
1246

UNCOV
1247
  code = taosAnalyBufWriteDataBegin(pBuf);
×
1248
  if (code != 0) goto _OVER;
×
1249

UNCOV
1250
  for (int32_t i = 0; i < pSupp->numOfCols; ++i) {
×
UNCOV
1251
    code = taosAnalyBufWriteColBegin(pBuf, i);
×
1252
    if (code != 0) goto _OVER;
×
1253
  }
1254

1255
_OVER:
×
1256
  if (code != 0) {
×
1257
    (void)taosAnalyBufClose(pBuf);
×
UNCOV
1258
    taosAnalyBufDestroy(pBuf);
×
1259
  }
1260
  return code;
×
1261
}
1262

1263
#else
1264

1265
int32_t createGenericAnalysisOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
1266
                                     SOperatorInfo** pOptrInfo) {
1267
  return TSDB_CODE_OPS_NOT_SUPPORT;
1268
}
1269
void analysisDestroyOperatorInfo(void* param) {}
1270

1271
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc