• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3903

24 Apr 2025 11:36AM UTC coverage: 55.307% (+0.09%) from 55.213%
#3903

push

travis-ci

happyguoxy
Sync branches at 2025-04-24 19:35

175024 of 316459 relevant lines covered (55.31%)

1151858.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.34
/source/common/src/tanalytics.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tanalytics.h"
18
#include "ttypes.h"
19
#include "tutil.h"
20

21
#ifdef USE_ANALYTICS
22
#include <curl/curl.h>
23

24
#define ANALYTICS_ALOG_SPLIT_CHAR ","
25

26
typedef struct {
27
  int64_t       ver;
28
  SHashObj     *hash;  // algoname:algotype -> SAnalyticsUrl
29
  TdThreadMutex lock;
30
} SAlgoMgmt;
31

32
typedef struct {
33
  char   *data;
34
  int64_t dataLen;
35
} SCurlResp;
36

37
static SAlgoMgmt tsAlgos = {0};
38
static int32_t   taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen);
39

40
const char *taosAnalysisAlgoType(EAnalAlgoType type) {
×
41
  switch (type) {
×
42
    case ANALY_ALGO_TYPE_ANOMALY_DETECT:
×
43
      return "anomaly-detection";
×
44
    case ANALY_ALGO_TYPE_FORECAST:
×
45
      return "forecast";
×
46
    default:
×
47
      return "unknown";
×
48
  }
49
}
50

51
const char *taosAnalyAlgoUrlStr(EAnalAlgoType type) {
×
52
  switch (type) {
×
53
    case ANALY_ALGO_TYPE_ANOMALY_DETECT:
×
54
      return "anomaly-detect";
×
55
    case ANALY_ALGO_TYPE_FORECAST:
×
56
      return "forecast";
×
57
    default:
×
58
      return "unknown";
×
59
  }
60
}
61

62
EAnalAlgoType taosAnalyAlgoInt(const char *name) {
×
63
  for (EAnalAlgoType i = 0; i < ANALY_ALGO_TYPE_END; ++i) {
×
64
    if (strcasecmp(name, taosAnalysisAlgoType(i)) == 0) {
×
65
      return i;
×
66
    }
67
  }
68

69
  return ANALY_ALGO_TYPE_END;
×
70
}
71

72
int32_t taosAnalyticsInit() {
70✔
73
  if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
70✔
74
    uError("failed to init curl");
×
75
    return -1;
×
76
  }
77

78
  tsAlgos.ver = 0;
70✔
79
  if (taosThreadMutexInit(&tsAlgos.lock, NULL) != 0) {
70✔
80
    uError("failed to init algo mutex");
×
81
    return -1;
×
82
  }
83

84
  tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
70✔
85
  if (tsAlgos.hash == NULL) {
70✔
86
    uError("failed to init algo hash");
×
87
    return -1;
×
88
  }
89

90
  uInfo("analysis env is initialized");
70✔
91
  return 0;
70✔
92
}
93

94
static void taosAnalyFreeHash(SHashObj *hash) {
70✔
95
  void *pIter = taosHashIterate(hash, NULL);
70✔
96
  while (pIter != NULL) {
70✔
97
    SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter;
×
98
    taosMemoryFree(pUrl->url);
×
99
    pIter = taosHashIterate(hash, pIter);
×
100
  }
101
  taosHashCleanup(hash);
70✔
102
}
70✔
103

104
void taosAnalyticsCleanup() {
70✔
105
  curl_global_cleanup();
70✔
106
  if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) {
70✔
107
    uError("failed to destroy anal lock");
×
108
  }
109
  taosAnalyFreeHash(tsAlgos.hash);
70✔
110
  tsAlgos.hash = NULL;
70✔
111
  uInfo("analysis env is cleaned up");
70✔
112
}
70✔
113

114
void taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {
×
115
  if (newVer > tsAlgos.ver) {
×
116
    if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
×
117
      SHashObj *hash = tsAlgos.hash;
×
118
      tsAlgos.ver = newVer;
×
119
      tsAlgos.hash = pHash;
×
120
      if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
×
121
        uError("failed to unlock hash")
×
122
      }
123
      taosAnalyFreeHash(hash);
×
124
    }
125
  } else {
126
    taosAnalyFreeHash(pHash);
×
127
  }
128
}
×
129

130
int32_t taosAnalyGetOpts(const char *pOption, SHashObj **pOptHash) {
6✔
131
  int32_t num = 0;
6✔
132
  int32_t code = 0;
6✔
133
  char   *pTmp = NULL;
6✔
134

135
  if (pOptHash != NULL) {
6✔
136
    (*pOptHash) = NULL;
6✔
137
  } else {
138
    return TSDB_CODE_INVALID_PARA;
×
139
  }
140

141
  pTmp = taosStrdup(pOption);
6✔
142
  if (pTmp == NULL) {
6✔
143
    return terrno;
×
144
  }
145

146
  int32_t unused = strdequote(pTmp);
6✔
147
  char  **pList = strsplit(pTmp, ANALYTICS_ALOG_SPLIT_CHAR, &num);
6✔
148

149
  (*pOptHash) = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), 1, HASH_NO_LOCK);
6✔
150
  if ((*pOptHash) == NULL) {
6✔
151
    taosMemoryFree(pTmp);
×
152
    taosMemoryFree(pList);
×
153
    return terrno;
×
154
  }
155

156
  for (int32_t i = 0; i < num; ++i) {
24✔
157
    int32_t parts = 0;
18✔
158
    char  **pParts = strsplit(pList[i], "=", &parts);
18✔
159

160
    if (parts < 2) {  // invalid parameters, ignore and continue
18✔
161
      taosMemoryFree(pParts);
13✔
162
      continue;
13✔
163
    }
164

165
    size_t keyLen = strtrim(pParts[0]);
5✔
166
    size_t valLen = strtrim(pParts[1]);
5✔
167

168
    code = taosHashPut(*pOptHash, pParts[0], keyLen, pParts[1], valLen);
5✔
169
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_DUP_KEY) {
5✔
170
      // return error
171
      taosMemoryFree(pTmp);
×
172
      taosMemoryFree(pList);
×
173
      taosMemoryFree(pParts);
×
174
      return code;
×
175
    } else {
176
      code = 0;
5✔
177
    }
178

179
    taosMemoryFree(pParts);
5✔
180
  }
181

182
  taosMemoryFree(pTmp);
6✔
183
  taosMemoryFree(pList);
6✔
184
  return code;
6✔
185
}
186

187
bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) {
×
188
  SHashObj* p = NULL;
×
189
  int32_t code = taosAnalyGetOpts(option, &p);
×
190
  if (code != TSDB_CODE_SUCCESS) {
×
191
    if (p != NULL) {
×
192
      taosHashCleanup(p);
×
193
      p = NULL;
×
194
    }
195
    return false;
×
196
  }
197

198
  void* pVal = taosHashGet(p, optName, strlen(optName));
×
199
  if (pVal == NULL) {
×
200
    taosHashCleanup(p);
×
201
    return false;
×
202
  }
203

204
  int32_t valLen = taosHashGetValueSize(pVal);
×
205

206
  if (optValue != NULL && optMaxLen >= 1) {
×
207
    int32_t len = MIN(valLen + 1, optMaxLen);
×
208
    tstrncpy(optValue, (char *)pVal, len);
×
209
  }
210

211
  taosHashCleanup(p);
×
212
  return true;
×
213
}
214

215
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) {
×
216
  int32_t code = 0;
×
217
  char    name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0};
×
218
  int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName);
×
219

220
  char *unused = strntolower(name, name, nameLen);
×
221

222
  if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
×
223
    SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
×
224
    if (pUrl != NULL) {
×
225
      tstrncpy(url, pUrl->url, urlLen);
×
226
      uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalysisAlgoType(type), url);
×
227
    } else {
228
      url[0] = 0;
×
229
      code = TSDB_CODE_ANA_ALGO_NOT_FOUND;
×
230
      uError("algo:%s, type:%s, url not found", algoName, taosAnalysisAlgoType(type));
×
231
    }
232

233
    if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
×
234
      uError("failed to unlock hash");
×
235
      return TSDB_CODE_OUT_OF_MEMORY;
×
236
    }
237
  }
238

239
  return code;
×
240
}
241

242
int64_t taosAnalyGetVersion() { return tsAlgos.ver; }
3,689✔
243

244
static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) {
×
245
  SCurlResp *pRsp = userdata;
×
246
  if (contLen == 0 || nmemb == 0 || pCont == NULL) {
×
247
    pRsp->dataLen = 0;
×
248
    pRsp->data = NULL;
×
249
    uError("curl response is received, len:%" PRId64, pRsp->dataLen);
×
250
    return 0;
×
251
  }
252

253
  int64_t newDataSize = (int64_t)contLen * nmemb;
×
254
  int64_t size = pRsp->dataLen + newDataSize;
×
255

256
  if (pRsp->data == NULL) {
×
257
    pRsp->data = taosMemoryMalloc(size + 1);
×
258
    if (pRsp->data == NULL) {
×
259
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
260
      return 0;  // return the recv length, if failed, return 0
×
261
    }
262
  } else {
263
    char *p = taosMemoryRealloc(pRsp->data, size + 1);
×
264
    if (p == NULL) {
×
265
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
266
      return 0;  // return the recv length, if failed, return 0
×
267
    }
268

269
    pRsp->data = p;
×
270
  }
271

272
  if (pRsp->data != NULL) {
×
273
    (void)memcpy(pRsp->data + pRsp->dataLen, pCont, newDataSize);
×
274

275
    pRsp->dataLen = size;
×
276
    pRsp->data[size] = 0;
×
277

278
    uDebugL("curl response is received, len:%" PRId64 ", content:%s", size, pRsp->data);
×
279
    return newDataSize;
×
280
  } else {
281
    pRsp->dataLen = 0;
×
282
    uError("failed to malloc curl response");
×
283
    return 0;
×
284
  }
285
}
286

287
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) {
×
288
  CURL    *curl = NULL;
×
289
  CURLcode code = 0;
×
290

291
  curl = curl_easy_init();
×
292
  if (curl == NULL) {
×
293
    uError("failed to create curl handle");
×
294
    return -1;
×
295
  }
296

297
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
298
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
299
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
300
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100) != 0) goto _OVER;
×
301

302
  uDebug("curl get request will sent, url:%s", url);
×
303
  code = curl_easy_perform(curl);
×
304
  if (code != CURLE_OK) {
×
305
    uError("failed to perform curl action, code:%d", code);
×
306
  }
307

308
_OVER:
×
309
  if (curl != NULL) curl_easy_cleanup(curl);
×
310
  return code;
×
311
}
312

313
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout) {
×
314
  struct curl_slist *headers = NULL;
×
315
  CURL              *curl = NULL;
×
316
  CURLcode           code = 0;
×
317

318
  curl = curl_easy_init();
×
319
  if (curl == NULL) {
×
320
    uError("failed to create curl handle");
×
321
    return -1;
×
322
  }
323

324
  headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8");
×
325
  if (curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers) != 0) goto _OVER;
×
326
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
327
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
328
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
329
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout) != 0) goto _OVER;
×
330
  if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER;
×
331
  if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER;
×
332
  if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER;
×
333

334
  uDebugL("curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf);
×
335
  code = curl_easy_perform(curl);
×
336
  if (code != CURLE_OK) {
×
337
    uError("failed to perform curl action, code:%d", code);
×
338
  }
339

340
_OVER:
×
341
  if (curl != NULL) {
×
342
    curl_slist_free_all(headers);
×
343
    curl_easy_cleanup(curl);
×
344
  }
345
  return code;
×
346
}
347

348
SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) {
×
349
  int32_t   code = -1;
×
350
  char     *pCont = NULL;
×
351
  int64_t   contentLen;
352
  SJson    *pJson = NULL;
×
353
  SCurlResp curlRsp = {0};
×
354

355
  if (type == ANALYTICS_HTTP_TYPE_GET) {
×
356
    if (taosCurlGetRequest(url, &curlRsp) != 0) {
×
357
      terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
×
358
      goto _OVER;
×
359
    }
360
  } else {
361
    code = taosAnalyBufGetCont(pBuf, &pCont, &contentLen);
×
362
    if (code != 0) {
×
363
      terrno = code;
×
364
      goto _OVER;
×
365
    }
366
    if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen, timeout) != 0) {
×
367
      terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
×
368
      goto _OVER;
×
369
    }
370
  }
371

372
  if (curlRsp.data == NULL || curlRsp.dataLen == 0) {
×
373
    terrno = TSDB_CODE_ANA_URL_RSP_IS_NULL;
×
374
    goto _OVER;
×
375
  }
376

377
  pJson = tjsonParse(curlRsp.data);
×
378
  if (pJson == NULL) {
×
379
    if (curlRsp.data[0] == '<') {
×
380
      terrno = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
381
    } else {
382
      terrno = TSDB_CODE_INVALID_JSON_FORMAT;
×
383
    }
384
    goto _OVER;
×
385
  }
386

387
_OVER:
×
388
  if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data);
×
389
  if (pCont != NULL) taosMemoryFree(pCont);
×
390
  return pJson;
×
391
}
392

393
static int32_t taosAnalyJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) {
×
394
  int32_t   code = 0;
×
395
  int64_t   contLen;
396
  char     *pCont = NULL;
×
397
  TdFilePtr pFile = NULL;
×
398

399
  pFile = taosOpenFile(fileName, TD_FILE_READ);
×
400
  if (pFile == NULL) {
×
401
    code = terrno;
×
402
    goto _OVER;
×
403
  }
404

405
  code = taosFStatFile(pFile, &contLen, NULL);
×
406
  if (code != 0) goto _OVER;
×
407

408
  pCont = taosMemoryMalloc(contLen + 1);
×
409
  if (pCont == NULL) {
×
410
    code = TSDB_CODE_OUT_OF_MEMORY;
×
411
    goto _OVER;
×
412
  }
413

414
  if (taosReadFile(pFile, pCont, contLen) != contLen) {
×
415
    code = terrno;
×
416
    goto _OVER;
×
417
  }
418

419
  pCont[contLen] = '\0';
×
420

421
_OVER:
×
422
  if (code == 0) {
×
423
    *ppCont = pCont;
×
424
    *pContLen = contLen;
×
425
  } else {
426
    if (pCont != NULL) taosMemoryFree(pCont);
×
427
  }
428
  if (pFile != NULL) taosCloseFile(&pFile);
×
429
  return code;
×
430
}
431

432
static int32_t taosAnalyJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
×
433
  char    buf[64] = {0};
×
434
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal);
×
435
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
436
    return terrno;
×
437
  }
438
  return 0;
×
439
}
440

441
static int32_t taosAnalyJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
×
442
  char    buf[128] = {0};
×
443
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal);
×
444
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
445
    return terrno;
×
446
  }
447
  return 0;
×
448
}
449

450
static int32_t taosAnalyJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
×
451
  char    buf[128] = {0};
×
452
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal);
×
453
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
454
    return terrno;
×
455
  }
456
  return 0;
×
457
}
458

459
static int32_t taosAnalyJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) {
×
460
  if (bufLen <= 0) {
×
461
    bufLen = strlen(buf);
×
462
  }
463
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
464
    return terrno;
×
465
  }
466
  return 0;
×
467
}
468

469
static int32_t taosAnalyJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalyJsonBufWriteStr(pBuf, "{\n", 0); }
×
470

471
static int32_t tsosAnalyJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
×
472
  pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
473
  if (pBuf->filePtr == NULL) {
×
474
    return terrno;
×
475
  }
476

477
  pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalyticsColBuf));
×
478
  if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY;
×
479
  pBuf->numOfCols = numOfCols;
×
480

481
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
×
482
    return taosAnalyJsonBufWriteStart(pBuf);
×
483
  }
484

485
  for (int32_t i = 0; i < numOfCols; ++i) {
×
486
    SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
487
    snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i);
×
488
    pCol->filePtr =
×
489
        taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
490
    if (pCol->filePtr == NULL) {
×
491
      return terrno;
×
492
    }
493
  }
494

495
  return taosAnalyJsonBufWriteStart(pBuf);
×
496
}
497

498
static int32_t taosAnalyJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
×
499
  char buf[128] = {0};
×
500
  bool first = (colIndex == 0);
×
501
  bool last = (colIndex == pBuf->numOfCols - 1);
×
502

503
  if (first) {
×
504
    if (taosAnalyJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) {
×
505
      return terrno;
×
506
    }
507
  }
508

509
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "  [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name,
×
510
                             tDataTypes[colType].bytes, last ? "" : ",");
511
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
512
    return terrno;
×
513
  }
514

515
  if (last) {
×
516
    if (taosAnalyJsonBufWriteStr(pBuf, "],\n", 0) != 0) {
×
517
      return terrno;
×
518
    }
519
  }
520

521
  return 0;
×
522
}
523

524
static int32_t taosAnalyJsonBufWriteDataBegin(SAnalyticBuf *pBuf) {
×
525
  return taosAnalyJsonBufWriteStr(pBuf, "\"data\": [\n", 0);
×
526
}
527

528
static int32_t taosAnalyJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
×
529
  if (bufLen <= 0) {
×
530
    bufLen = strlen(buf);
×
531
  }
532

533
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
×
534
    int32_t ret = taosWriteFile(pBuf->filePtr, buf, bufLen);
×
535
    if (ret != bufLen) {
×
536
      return terrno;
×
537
    }
538
  } else {
539
    int32_t ret = taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen);
×
540
    if (ret != bufLen) {
×
541
      return terrno;
×
542
    }
543
  }
544

545
  return 0;
×
546
}
547

548
static int32_t taosAnalyJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
×
549
  return taosAnalyJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex);
×
550
}
551

552
static int32_t taosAnalyJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
×
553
  if (colIndex == pBuf->numOfCols - 1) {
×
554
    return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex);
×
555

556
  } else {
557
    return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex);
×
558
  }
559
}
560

561
static int32_t taosAnalyJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
×
562
  char    buf[64];
563
  int32_t bufLen = 0;
×
564

565
  if (pBuf->pCols[colIndex].numOfRows != 0) {
×
566
    buf[bufLen] = ',';
×
567
    buf[bufLen + 1] = '\n';
×
568
    buf[bufLen + 2] = 0;
×
569
    bufLen += 2;
×
570
  }
571

572
  switch (colType) {
×
573
    case TSDB_DATA_TYPE_BOOL:
×
574
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0);
×
575
      break;
×
576
    case TSDB_DATA_TYPE_TINYINT:
×
577
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue);
×
578
      break;
×
579
    case TSDB_DATA_TYPE_UTINYINT:
×
580
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue);
×
581
      break;
×
582
    case TSDB_DATA_TYPE_SMALLINT:
×
583
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue);
×
584
      break;
×
585
    case TSDB_DATA_TYPE_USMALLINT:
×
586
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue);
×
587
      break;
×
588
    case TSDB_DATA_TYPE_INT:
×
589
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue);
×
590
      break;
×
591
    case TSDB_DATA_TYPE_UINT:
×
592
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue);
×
593
      break;
×
594
    case TSDB_DATA_TYPE_BIGINT:
×
595
    case TSDB_DATA_TYPE_TIMESTAMP:
596
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64, *(int64_t *)colValue);
×
597
      break;
×
598
    case TSDB_DATA_TYPE_UBIGINT:
×
599
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64, *(uint64_t *)colValue);
×
600
      break;
×
601
    case TSDB_DATA_TYPE_FLOAT:
×
602
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue));
×
603
      break;
×
604
    case TSDB_DATA_TYPE_DOUBLE:
×
605
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue));
×
606
      break;
×
607
    default:
×
608
      buf[bufLen] = '\0';
×
609
  }
610

611
  pBuf->pCols[colIndex].numOfRows++;
×
612
  return taosAnalyJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex);
×
613
}
614

615
static int32_t taosAnalyJsonBufWriteDataEnd(SAnalyticBuf *pBuf) {
×
616
  int32_t code = 0;
×
617
  char   *pCont = NULL;
×
618
  int64_t contLen = 0;
×
619

620
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
621
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
622
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
623

624
      code = taosFsyncFile(pCol->filePtr);
×
625
      if (code != 0) return code;
×
626

627
      code = taosCloseFile(&pCol->filePtr);
×
628
      if (code != 0) return code;
×
629

630
      code = taosAnalyJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen);
×
631
      if (code != 0) return code;
×
632

633
      code = taosAnalyJsonBufWriteStr(pBuf, pCont, contLen);
×
634
      if (code != 0) return code;
×
635

636
      taosMemoryFreeClear(pCont);
×
637
      contLen = 0;
×
638
    }
639
  }
640

641
  return taosAnalyJsonBufWriteStr(pBuf, "],\n", 0);
×
642
}
643

644
static int32_t taosAnalyJsonBufWriteEnd(SAnalyticBuf *pBuf) {
×
645
  int32_t code = taosAnalyJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows);
×
646
  if (code != 0) return code;
×
647

648
  return taosAnalyJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0);
×
649
}
650

651
int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) {
×
652
  int32_t code = taosAnalyJsonBufWriteEnd(pBuf);
×
653
  if (code != 0) return code;
×
654

655
  if (pBuf->filePtr != NULL) {
×
656
    code = taosFsyncFile(pBuf->filePtr);
×
657
    if (code != 0) return code;
×
658
    code = taosCloseFile(&pBuf->filePtr);
×
659
    if (code != 0) return code;
×
660
  }
661

662
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
663
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
664
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
665
      if (pCol->filePtr != NULL) {
×
666
        code = taosFsyncFile(pCol->filePtr);
×
667
        if (code != 0) return code;
×
668
        code = taosCloseFile(&pCol->filePtr);
×
669
        if (code != 0) return code;
×
670
      }
671
    }
672
  }
673

674
  return 0;
×
675
}
676

677
void taosAnalyBufDestroy(SAnalyticBuf *pBuf) {
×
678
  if (pBuf->fileName[0] != 0) {
×
679
    if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr);
×
680
    // taosRemoveFile(pBuf->fileName);
681
    pBuf->fileName[0] = 0;
×
682
  }
683

684
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
685
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
686
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
687
      if (pCol->fileName[0] != 0) {
×
688
        if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr);
×
689
        if (taosRemoveFile(pCol->fileName) != 0) {
×
690
          uError("failed to remove file %s", pCol->fileName);
×
691
        }
692
        pCol->fileName[0] = 0;
×
693
      }
694
    }
695
  }
696

697
  taosMemoryFreeClear(pBuf->pCols);
×
698
  pBuf->numOfCols = 0;
×
699
}
×
700

701
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
×
702
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
703
    return tsosAnalyJsonBufOpen(pBuf, numOfCols);
×
704
  } else {
705
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
706
  }
707
}
708

709
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
×
710
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
711
    return taosAnalyJsonBufWriteOptStr(pBuf, optName, optVal);
×
712
  } else {
713
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
714
  }
715
}
716

717
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
×
718
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
719
    return taosAnalyJsonBufWriteOptInt(pBuf, optName, optVal);
×
720
  } else {
721
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
722
  }
723
}
724

725
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
×
726
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
727
    return taosAnalyJsonBufWriteOptFloat(pBuf, optName, optVal);
×
728
  } else {
729
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
730
  }
731
}
732

733
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
×
734
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
735
    return taosAnalyJsonBufWriteColMeta(pBuf, colIndex, colType, colName);
×
736
  } else {
737
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
738
  }
739
}
740

741
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) {
×
742
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
743
    return taosAnalyJsonBufWriteDataBegin(pBuf);
×
744
  } else {
745
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
746
  }
747
}
748

749
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
×
750
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
751
    return taosAnalyJsonBufWriteColBegin(pBuf, colIndex);
×
752
  } else {
753
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
754
  }
755
}
756

757
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
×
758
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
759
    return taosAnalyJsonBufWriteColData(pBuf, colIndex, colType, colValue);
×
760
  } else {
761
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
762
  }
763
}
764

765
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
×
766
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
767
    return taosAnalyJsonBufWriteColEnd(pBuf, colIndex);
×
768
  } else {
769
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
770
  }
771
}
772

773
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) {
×
774
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
775
    return taosAnalyJsonBufWriteDataEnd(pBuf);
×
776
  } else {
777
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
778
  }
779
}
780

781
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) {
×
782
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
783
    return taosAnalJsonBufClose(pBuf);
×
784
  } else {
785
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
786
  }
787
}
788

789
static int32_t taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) {
×
790
  *ppCont = NULL;
×
791
  *pContLen = 0;
×
792

793
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
794
    return taosAnalyJsonBufGetCont(pBuf->fileName, ppCont, pContLen);
×
795
  } else {
796
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
797
  }
798
}
799

800
// extract the timeout parameter
801
int64_t taosAnalysisParseTimout(SHashObj* pHashMap, const char* id) {
×
802
  int32_t code = 0;
×
803
  char* pTimeout = taosHashGet(pHashMap, ALGO_OPT_TIMEOUT_NAME, strlen(ALGO_OPT_TIMEOUT_NAME));
×
804
  if (pTimeout == NULL) {
×
805
    uDebug("%s not set the timeout val, set default:%d", id, ANALY_DEFAULT_TIMEOUT);
×
806
    return ANALY_DEFAULT_TIMEOUT;
×
807
  } else {
808
    int64_t t = taosStr2Int64(pTimeout, NULL, 10);
×
809
    if (t <= 0 || t > ANALY_MAX_TIMEOUT) {
×
810
      uDebug("%s timeout val:%" PRId64 "s is invalid (greater than 10min or less than 1s), use default:%dms", id, t,
×
811
             ANALY_DEFAULT_TIMEOUT);
812
      return ANALY_DEFAULT_TIMEOUT;
×
813
    }
814

815
    uDebug("%s timeout val is set to: %" PRId64 "s", id, t);
×
816
    return t;
×
817
  }
818
}
819

820
int32_t taosAnalysisParseAlgo(const char* pOpt, char* pAlgoName, char* pUrl, int32_t type, int32_t len, SHashObj* pHashMap, const char* id) {
×
821
  char* pAlgo = taosHashGet(pHashMap, ALGO_OPT_ALGO_NAME, strlen(ALGO_OPT_ALGO_NAME));
×
822
  if (pAlgo == NULL) {
×
823
    uError("%s failed to get analysis algorithm name from %s", id, pOpt);
×
824
    return TSDB_CODE_ANA_ALGO_NOT_FOUND;
×
825
  }
826

827
  tstrncpy(pAlgoName, pAlgo, taosHashGetValueSize(pAlgo) + 1);
×
828

829
  if (taosAnalyGetAlgoUrl(pAlgoName, type, pUrl, len) != 0) {
×
830
    uError("%s failed to get analysis algorithm url from %s", id, pAlgoName);
×
831
    return TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
832
  }
833

834
  return 0;
×
835
}
836

837
int8_t taosAnalysisParseWncheck(SHashObj* pHashMap, const char* id) {
×
838
  char* pWncheck = taosHashGet(pHashMap, ALGO_OPT_WNCHECK_NAME, strlen(ALGO_OPT_WNCHECK_NAME));
×
839
  if (pWncheck != NULL) {
×
840
    int32_t v = (int32_t) taosStr2Int64(pWncheck, NULL, 10);
×
841
    uDebug("%s analysis wncheck:%d", id, v);
×
842
    return v;
×
843
  } else {
844
    uDebug("%s analysis wncheck not found, use default:%d", id, ANALY_FORECAST_DEFAULT_WNCHECK);
×
845
    return ANALY_FORECAST_DEFAULT_WNCHECK;
×
846
  }
847
}
848

849
#else
850

851
int32_t taosAnalyticsInit() { return 0; }
852
void    taosAnalyticsCleanup() {}
853
SJson  *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) {
854
  return NULL;
855
}
856

857
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
858
bool    taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }
859
int64_t taosAnalyGetVersion() { return 0; }
860
void    taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {}
861

862
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; }
863
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; }
864
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
865
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; }
866
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
867
  return 0;
868
}
869
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; }
870
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
871
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
872
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
873
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; }
874
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) { return 0; }
875
void    taosAnalyBufDestroy(SAnalyticBuf *pBuf) {}
876

877
const char   *taosAnalysisAlgoType(EAnalAlgoType algoType) { return 0; }
878
EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }
879
const char   *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; }
880

881
int64_t taosAnalysisParseTimout(SHashObj *pHashMap, const char *id) { return 0; }
882

883
int32_t taosAnalysisParseAlgo(const char *pOpt, char *pAlgoName, char *pUrl, int32_t type, int32_t len,
884
                              SHashObj *pHashMap, const char *id) {
885
  return 0;
886
}
887

888
int8_t taosAnalysisParseWncheck(SHashObj* pHashMap, const char* id) { return 0;}
889
int32_t taosAnalyGetOpts(const char *pOption, SHashObj **pOptHash) { return 0;}
890

891
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc