• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3676

22 Mar 2025 04:46PM UTC coverage: 25.147% (-36.8%) from 61.952%
#3676

push

travis-ci

web-flow
fix: userOperTest in linux (#30363)

Co-authored-by: taos-support <it@taosdata.com>

55963 of 304767 branches covered (18.36%)

Branch coverage included in aggregate %.

96374 of 301020 relevant lines covered (32.02%)

582640.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/streamclient.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#ifndef WINDOWS
17

18
#include <curl/curl.h>
19

20
#endif
21

22
#include "executorInt.h"
23
#include "streamsession.h"
24
#include "tjson.h"
25

26
#ifndef WINDOWS
27

28
static int32_t buildSessionResultSql(SSHashObj* pRangeMap, SStreamRecParam* pParam, bool* pEnd) {
×
29
  int64_t prevLen = 0;
×
30
  int64_t len = 0;
×
31
  (void)memset(pParam->pSql, 0, pParam->sqlCapcity);
×
32
  *pEnd = true;
×
33
  while ((pParam->pIteData = tSimpleHashIterate(pRangeMap, pParam->pIteData, &pParam->iter)) != NULL) {
×
34
    SSessionKey* pKey = tSimpleHashGetKey(pParam->pIteData, NULL);
×
35
    if (prevLen > 0) {
×
36
      len += tsnprintf(pParam->pSql + len, pParam->sqlCapcity - len, " union all ");
×
37
      if (len >= pParam->sqlCapcity - 1) {
×
38
        *pEnd = false;
×
39
        break;
×
40
      }
41
    }
42
    len += tsnprintf(pParam->pSql + len, pParam->sqlCapcity - len,
×
43
                     "select %" PRId64 ", %" PRId64 ", `%s` , cast(`%s` as bigint), cast( (`%s` - %" PRId64
44
                     ") as bigint) from %s where `%s` == %" PRIu64 " and ( (`%s` - %" PRId64 ") >= %" PRId64
45
                     " and `%s` <= %" PRId64 " )",
46
                     pKey->win.skey, pKey->win.ekey, pParam->pGroupIdName, pParam->pWstartName, pParam->pWendName,
×
47
                     pParam->gap, pParam->pStbFullName, pParam->pGroupIdName, pKey->groupId, pParam->pWendName,
×
48
                     pParam->gap, pKey->win.skey, pParam->pWstartName, pKey->win.ekey);
×
49
    if (len >= pParam->sqlCapcity - 1) {
×
50
      *pEnd = false;
×
51
      break;
×
52
    } else {
53
      prevLen = len;
×
54
    }
55
  }
56
  pParam->pSql[prevLen + 1] = '\0';
×
57
  return TSDB_CODE_SUCCESS;
×
58
}
59

60
static size_t parseResult(char* pCont, size_t contLen, size_t nmemb, void* userdata) {
×
61
  int32_t code = TSDB_CODE_SUCCESS;
×
62
  int32_t lino = 0;
×
63
  qDebug("stream client response is received, contLen:%d, nmemb:%d, pCont:%p", (int32_t)contLen, (int32_t)nmemb, pCont);
×
64
  QUERY_CHECK_CONDITION(contLen > 0, code, lino, _end, TSDB_CODE_FAILED);
×
65
  QUERY_CHECK_CONDITION(nmemb > CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
66
  QUERY_CHECK_NULL(pCont, code, lino, _end, TSDB_CODE_FAILED);
×
67

68
  qTrace("===stream=== result:%s", pCont);
×
69
  (*(SJson**)userdata) = tjsonParse(pCont);
×
70

71
_end:
×
72
  if (code != TSDB_CODE_SUCCESS) {
×
73
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
74
    return 0;
×
75
  }
76
  return contLen * nmemb;
×
77
}
78

79
static int32_t doProcessSql(SStreamRecParam* pParam, SJson** ppJsonResult) {
×
80
  int32_t code = TSDB_CODE_SUCCESS;
×
81
  int32_t lino = 0;
×
82

83
  CURL* pCurl = curl_easy_init();
×
84
  QUERY_CHECK_NULL(pCurl, code, lino, _end, TSDB_CODE_FAILED);
×
85

86
  CURLcode curlRes = curl_easy_setopt(pCurl, CURLOPT_URL, pParam->pUrl);
×
87
  QUERY_CHECK_CONDITION(curlRes == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
88

89
  struct curl_slist* pHeaders = NULL;
×
90
  pHeaders = curl_slist_append(pHeaders, "Content-Type:application/json;charset=UTF-8");
×
91
  QUERY_CHECK_NULL(pHeaders, code, lino, _end, TSDB_CODE_FAILED);
×
92
  pHeaders = curl_slist_append(pHeaders, pParam->pAuth);
×
93
  QUERY_CHECK_NULL(pHeaders, code, lino, _end, TSDB_CODE_FAILED);
×
94

95
  curlRes = curl_easy_setopt(pCurl, CURLOPT_HTTPHEADER, pHeaders);
×
96
  QUERY_CHECK_CONDITION(curlRes == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
97

98
  curlRes = curl_easy_setopt(pCurl, CURLOPT_POSTFIELDS, pParam->pSql);
×
99
  QUERY_CHECK_CONDITION(curlRes == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
100

101
  qDebug("===stream=== sql:%s", pParam->pSql);
×
102

103
  curlRes = curl_easy_setopt(pCurl, CURLOPT_FOLLOWLOCATION, 1L);
×
104
  QUERY_CHECK_CONDITION(curlRes == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
105

106
  curlRes = curl_easy_setopt(pCurl, CURLOPT_WRITEFUNCTION, parseResult);
×
107
  QUERY_CHECK_CONDITION(curlRes == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
108

109
  curlRes = curl_easy_setopt(pCurl, CURLOPT_WRITEDATA, ppJsonResult);
×
110
  QUERY_CHECK_CONDITION(curlRes == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
111

112
  curlRes = curl_easy_perform(pCurl);
×
113
  if (curlRes != CURLE_OK) {
×
114
    qError("error: unable to request data from %s.since %s. res code:%d", pParam->pUrl, curl_easy_strerror(curlRes),
×
115
           (int32_t)curlRes);
116
    QUERY_CHECK_CONDITION(curlRes == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
117
  }
118

119
_end:
×
120
  if (pHeaders != NULL) {
×
121
    curl_slist_free_all(pHeaders);
×
122
  }
123
  if (pCurl != NULL) {
×
124
    curl_easy_cleanup(pCurl);
×
125
  }
126
  if (code != TSDB_CODE_SUCCESS) {
×
127
    qError("%s failed at line %d since %s. error code:%d", __func__, lino, tstrerror(code), curlRes);
×
128
  }
129
  return code;
×
130
}
131

132
static int32_t doTransformResult(const SJson* pJsonResult, SArray* pRangeRes) {
×
133
  int32_t code = TSDB_CODE_SUCCESS;
×
134
  int32_t lino = 0;
×
135

136
  SJson* jArray = tjsonGetObjectItem(pJsonResult, "data");
×
137
  QUERY_CHECK_NULL(jArray, code, lino, _end, TSDB_CODE_FAILED);
×
138

139
  int32_t rows = tjsonGetArraySize(jArray);
×
140
  if (rows > 0) {
×
141
    for (int32_t i = 0; i < rows; ++i) {
×
142
      SJson* pRow = tjsonGetArrayItem(jArray, i);
×
143
      QUERY_CHECK_NULL(jArray, code, lino, _end, TSDB_CODE_FAILED);
×
144
      int32_t cols = tjsonGetArraySize(pRow);
×
145
      if (cols > 0) {
×
146
        SArray* pRowArray = taosArrayInit(cols, sizeof(int64_t));
×
147
        QUERY_CHECK_NULL(pRowArray, code, lino, _end, terrno);
×
148
        void* tmpRes = taosArrayPush(pRangeRes, &pRowArray);
×
149
        QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
×
150
        for (int32_t j = 0; j < cols; ++j) {
×
151
          SJson*  pCell = tjsonGetArrayItem(pRow, j);
×
152
          int64_t data = 0;
×
153
          tjsonGetObjectValueBigInt(pCell, &data);
×
154
          tmpRes = taosArrayPush(pRowArray, &data);
×
155
          QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
×
156
        }
157
      }
158
    }
159
  }
160

161
_end:
×
162
  if (code != TSDB_CODE_SUCCESS) {
×
163
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
164
  }
165
  return code;
×
166
}
167

168
int32_t streamClientGetResultRange(SStreamRecParam* pParam, SSHashObj* pRangeMap, SArray* pRangeRes) {
×
169
  int32_t code = TSDB_CODE_SUCCESS;
×
170
  int32_t lino = 0;
×
171

172
  pParam->pIteData = NULL;
×
173
  pParam->iter = 0;
×
174
  bool isEnd = false;
×
175
  while (!isEnd) {
×
176
    code = buildSessionResultSql(pRangeMap, pParam, &isEnd);
×
177
    QUERY_CHECK_CODE(code, lino, _end);
×
178

179
    SJson* pJsRes = NULL;
×
180
    code = doProcessSql(pParam, &pJsRes);
×
181
    QUERY_CHECK_CODE(code, lino, _end);
×
182
    code = doTransformResult(pJsRes, pRangeRes);
×
183
    QUERY_CHECK_CODE(code, lino, _end);
×
184
  }
185

186
_end:
×
187
  if (code != TSDB_CODE_SUCCESS) {
×
188
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
189
  }
190
  return code;
×
191
}
192

193
static int32_t buildFillResultSql(SWinKey* pKey, SStreamRecParam* pParam) {
×
194
  (void)memset(pParam->pSql, 0, pParam->sqlCapcity);
×
195
  (void)tsnprintf(
×
196
      pParam->pSql, pParam->sqlCapcity,
×
197
      "(select *, cast(`%s` as bigint) from %s where `%s` == %" PRIu64 " and `%s` < %" PRId64
198
      " and `%s` ==0 order by 1 desc limit 1) union all (select *, cast(`%s` as bigint) from  %s where `%s` == %" PRIu64
199
      " and `%s` > %" PRId64 " and `%s` ==0 order by 1 asc limit 1)",
200
      pParam->pWstartName, pParam->pStbFullName, pParam->pGroupIdName, pKey->groupId, pParam->pWstartName, pKey->ts,
×
201
      pParam->pIsWindowFilledName, pParam->pWstartName, pParam->pStbFullName, pParam->pGroupIdName, pKey->groupId,
×
202
      pParam->pWstartName, pKey->ts, pParam->pIsWindowFilledName);
×
203

204
  return TSDB_CODE_SUCCESS;
×
205
}
206

207
static int32_t jsonToDataCell(const SJson* pJson, SResultCellData* pCell) {
×
208
  int32_t code = TSDB_CODE_SUCCESS;
×
209
  if (IS_INTEGER_TYPE(pCell->type)) {
×
210
    int64_t data = 0;
×
211
    tjsonGetObjectValueBigInt(pJson, &data);
×
212
    SET_TYPED_DATA(pCell->pData, pCell->type, data);
×
213
  } else if (IS_FLOAT_TYPE(pCell->type)) {
×
214
    double data = 0;
×
215
    tjsonGetObjectValueDouble(pJson, &data);
×
216
    SET_TYPED_DATA(pCell->pData, pCell->type, data);
×
217
  } else if (IS_TIMESTAMP_TYPE(pCell->type)) {
×
218
  } else if (IS_BOOLEAN_TYPE(pCell->type)) {
×
219
    bool data = cJSON_IsTrue(pJson) ? true : false;
×
220
    SET_TYPED_DATA(pCell->pData, pCell->type, data);
×
221
  } else {
222
    char* pStr = cJSON_GetStringValue(pJson);
×
223
    STR_TO_VARSTR(pCell->pData, pStr);
×
224
  }
225

226
  return code;
×
227
}
228

229
static int32_t getColumnIndex(SSHashObj* pMap, int32_t colId) {
×
230
  void* pVal  = tSimpleHashGet(pMap, &colId, sizeof(int32_t));
×
231
  if (pVal == NULL) {
×
232
    return -1;
×
233
  }
234
  return *(int32_t*)pVal;
×
235
}
236

237
static int32_t doTransformFillResult(const SJson* pJsonResult, SArray* pRangeRes, void* pEmptyRow, int32_t size,
×
238
                                     int32_t* pOffsetInfo, int32_t numOfCols, SSHashObj* pMap) {
239
  int32_t code = TSDB_CODE_SUCCESS;
×
240
  int32_t lino = 0;
×
241

242
  SJson* jArray = tjsonGetObjectItem(pJsonResult, "data");
×
243
  QUERY_CHECK_NULL(jArray, code, lino, _end, TSDB_CODE_FAILED);
×
244

245
  int32_t rows = tjsonGetArraySize(jArray);
×
246
  if (rows > 0) {
×
247
    for (int32_t i = 0; i < rows; ++i) {
×
248
      SJson* pRow = tjsonGetArrayItem(jArray, i);
×
249
      QUERY_CHECK_NULL(jArray, code, lino, _end, TSDB_CODE_FAILED);
×
250
      int32_t        cols = tjsonGetArraySize(pRow);
×
251
      SSliceRowData* pRowData = taosMemoryCalloc(1, sizeof(TSKEY) + size);
×
252
      pRowData->key = INT64_MIN;
×
253
      memcpy(pRowData->pRowVal, pEmptyRow, size);
×
254
      int32_t colOffset = 0;
×
255
      for (int32_t j = 0; j < numOfCols; ++j) {
×
256
        SResultCellData* pDataCell = getSliceResultCell((SResultCellData*)pRowData->pRowVal, j, pOffsetInfo);
×
257
        QUERY_CHECK_NULL(pDataCell, code, lino, _end, TSDB_CODE_FAILED);
×
258

259
        int32_t colIndex = getColumnIndex(pMap, j);
×
260
        if (colIndex == -1 || colIndex >= cols) {
×
261
          qDebug("invalid result columm index:%d", colIndex);
×
262
          pDataCell->isNull = true;
×
263
          continue;
×
264
        }
265

266
        SJson* pJsonCell = tjsonGetArrayItem(pRow, colIndex);
×
267
        QUERY_CHECK_NULL(pJsonCell, code, lino, _end, TSDB_CODE_FAILED);
×
268

269
        code = jsonToDataCell(pJsonCell, pDataCell);
×
270
        QUERY_CHECK_CODE(code, lino, _end);
×
271
      }
272
      if (cols > 1) {
×
273
        SJson* pJsonSKey = tjsonGetArrayItem(pRow, cols - 1);
×
274
        QUERY_CHECK_NULL(pJsonSKey, code, lino, _end, TSDB_CODE_FAILED);
×
275
        tjsonGetObjectValueBigInt(pJsonSKey, &pRowData->key);
×
276
      }
277

278
      void* pTempRes = taosArrayPush(pRangeRes, &pRowData);
×
279
      QUERY_CHECK_NULL(pTempRes, code, lino, _end, terrno);
×
280
    }
281
  }
282

283
_end:
×
284
  if (code != TSDB_CODE_SUCCESS) {
×
285
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
286
  }
287
  return code;
×
288
}
289

290
int32_t streamClientGetFillRange(SStreamRecParam* pParam, SWinKey* pKey, SArray* pRangeRes, void* pEmptyRow, int32_t size,
×
291
                                 int32_t* pOffsetInfo, int32_t numOfCols) {
292
  int32_t code = TSDB_CODE_SUCCESS;
×
293
  int32_t lino = 0;
×
294

295
  code = buildFillResultSql(pKey, pParam);
×
296
  QUERY_CHECK_CODE(code, lino, _end);
×
297

298
  SJson* pJsRes = NULL;
×
299
  code = doProcessSql(pParam, &pJsRes);
×
300
  QUERY_CHECK_CODE(code, lino, _end);
×
301
  code = doTransformFillResult(pJsRes, pRangeRes, pEmptyRow, size, pOffsetInfo, numOfCols, pParam->pColIdMap);
×
302
  QUERY_CHECK_CODE(code, lino, _end);
×
303

304
_end:
×
305
  if (code != TSDB_CODE_SUCCESS) {
×
306
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
307
  }
308
  return code;
×
309
}
310

311
int32_t streamClientCheckCfg(SStreamRecParam* pParam) {
×
312
  int32_t code = TSDB_CODE_SUCCESS;
×
313
  int32_t lino = 0;
×
314

315
  const char* pTestSql = "select name, ntables, status from information_schema.ins_databases;";
×
316
  (void)memset(pParam->pSql, 0, pParam->sqlCapcity);
×
317
  tstrncpy(pParam->pSql, pTestSql, pParam->sqlCapcity);
×
318

319
  SJson* pJsRes = NULL;
×
320
  code = doProcessSql(pParam, &pJsRes);
×
321
  QUERY_CHECK_CODE(code, lino, _end);
×
322
  SJson* jArray = tjsonGetObjectItem(pJsRes, "data");
×
323
  QUERY_CHECK_NULL(jArray, code, lino, _end, TSDB_CODE_FAILED);
×
324

325
  int32_t rows = tjsonGetArraySize(jArray);
×
326
  if (rows < 2) {
×
327
    code = TSDB_CODE_INVALID_CFG_VALUE;
×
328
    qError("invalid taos adapter config value");
×
329
  }
330

331
_end:
×
332
  if (code != TSDB_CODE_SUCCESS) {
×
333
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
334
  }
335
  return code;
×
336
}
337

338
#else
339

340
int32_t streamClientGetResultRange(SStreamRecParam* pParam, SSHashObj* pRangeMap, SArray* pRangeRes) {
341
  return TSDB_CODE_FAILED;
342
}
343
int32_t streamClientGetFillRange(SStreamRecParam* pParam, SWinKey* pKey, SArray* pRangeRes, void* pEmptyRow, int32_t size, int32_t* pOffsetInfo, int32_t numOfCols) {
344
  return TSDB_CODE_FAILED;
345
}
346

347
int32_t streamClientCheckCfg(SStreamRecParam* pParam) {
348
    return TSDB_CODE_FAILED;
349
}
350

351
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc