• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3590

24 Jan 2025 08:57AM UTC coverage: 63.566% (+0.4%) from 63.191%
#3590

push

travis-ci

web-flow
Merge pull request #29638 from taosdata/docs/TS-5846-3.0

enh: TDengine modify taosBenchmark new query rule cases and add doc

141313 of 285630 branches covered (49.47%)

Branch coverage included in aggregate %.

220046 of 282844 relevant lines covered (77.8%)

18706734.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.2
/source/dnode/vnode/src/tq/tqStreamNotify.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cmdnodes.h"
17
#include "tq.h"
18

19
#ifndef WINDOWS
20
#include "curl/curl.h"
21
#endif
22

23
#define STREAM_EVENT_NOTIFY_RETRY_MS 50  // 50ms
24

25
typedef struct SStreamNotifyHandle {
26
  TdThreadMutex mutex;
27
#ifndef WINDOWS
28
  CURL*         curl;
29
#endif
30
  char*         url;
31
} SStreamNotifyHandle;
32

33
struct SStreamNotifyHandleMap {
34
  TdThreadMutex gMutex;
35
  SHashObj*     handleMap;
36
};
37

38
static void stopStreamNotifyConn(SStreamNotifyHandle* pHandle) {
×
39
#ifndef WINDOWS
40
  if (pHandle == NULL || pHandle->curl == NULL) {
×
41
    return;
×
42
  }
43
  // status code 1000 means normal closure
44
  size_t   len = 0;
×
45
  uint16_t status = htons(1000);
×
46
  CURLcode res = curl_ws_send(pHandle->curl, &status, sizeof(status), &len, 0, CURLWS_CLOSE);
×
47
  if (res != CURLE_OK) {
×
48
    tqWarn("failed to send ws-close msg to %s for %d", pHandle->url ? pHandle->url : "", res);
×
49
  }
50
  // TODO: add wait mechanism for peer connection close response
51
  curl_easy_cleanup(pHandle->curl);
×
52
#endif
53
}
54

55
static void destroyStreamNotifyHandle(void* ptr) {
×
56
  int32_t               code = TSDB_CODE_SUCCESS;
×
57
  int32_t               lino = 0;
×
58
  SStreamNotifyHandle** ppHandle = ptr;
×
59

60
  if (ppHandle == NULL || *ppHandle == NULL) {
×
61
    return;
×
62
  }
63
  code = taosThreadMutexDestroy(&(*ppHandle)->mutex);
×
64
  stopStreamNotifyConn(*ppHandle);
×
65
  taosMemoryFreeClear((*ppHandle)->url);
×
66
  taosMemoryFreeClear(*ppHandle);
×
67
}
68

69
static void releaseStreamNotifyHandle(SStreamNotifyHandle** ppHandle) {
×
70
  if (ppHandle == NULL || *ppHandle == NULL) {
×
71
    return;
×
72
  }
73
  (void)taosThreadMutexUnlock(&(*ppHandle)->mutex);
×
74
  *ppHandle = NULL;
×
75
}
76

77
static int32_t acquireStreamNotifyHandle(SStreamNotifyHandleMap* pMap, const char* url,
×
78
                                         SStreamNotifyHandle** ppHandle) {
79
#ifndef WINDOWS
80
  int32_t               code = TSDB_CODE_SUCCESS;
×
81
  int32_t               lino = 0;
×
82
  bool                  gLocked = false;
×
83
  SStreamNotifyHandle** ppFindHandle = NULL;
×
84
  SStreamNotifyHandle*  pNewHandle = NULL;
×
85
  CURL*                 newCurl = NULL;
×
86
  CURLcode              res = CURLE_OK;
×
87

88
  TSDB_CHECK_NULL(pMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
89
  TSDB_CHECK_NULL(url, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
90
  TSDB_CHECK_NULL(ppHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
91

92
  *ppHandle = NULL;
×
93

94
  code = taosThreadMutexLock(&pMap->gMutex);
×
95
  TSDB_CHECK_CODE(code, lino, _end);
×
96
  gLocked = true;
×
97

98
  ppFindHandle = taosHashGet(pMap->handleMap, url, strlen(url));
×
99
  if (ppFindHandle == NULL) {
×
100
    pNewHandle = taosMemoryCalloc(1, sizeof(SStreamNotifyHandle));
×
101
    TSDB_CHECK_NULL(pNewHandle, code, lino, _end, terrno);
×
102
    code = taosThreadMutexInit(&pNewHandle->mutex, NULL);
×
103
    TSDB_CHECK_CODE(code, lino, _end);
×
104
    code = taosHashPut(pMap->handleMap, url, strlen(url), &pNewHandle, POINTER_BYTES);
×
105
    TSDB_CHECK_CODE(code, lino, _end);
×
106
    *ppHandle = pNewHandle;
×
107
    pNewHandle = NULL;
×
108
  } else {
109
    *ppHandle = *ppFindHandle;
×
110
  }
111

112
  code = taosThreadMutexLock(&(*ppHandle)->mutex);
×
113
  TSDB_CHECK_CODE(code, lino, _end);
×
114

115
  (void)taosThreadMutexUnlock(&pMap->gMutex);
×
116
  gLocked = false;
×
117

118
  if ((*ppHandle)->curl == NULL) {
×
119
    newCurl = curl_easy_init();
×
120
    TSDB_CHECK_NULL(newCurl, code, lino, _end, TSDB_CODE_FAILED);
×
121
    res = curl_easy_setopt(newCurl, CURLOPT_URL, url);
×
122
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
123
    res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYPEER, 0L);
×
124
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
125
    res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYHOST, 0L);
×
126
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
127
    res = curl_easy_setopt(newCurl, CURLOPT_TIMEOUT, 3L);
×
128
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
129
    res = curl_easy_setopt(newCurl, CURLOPT_CONNECT_ONLY, 2L);
×
130
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
131
    res = curl_easy_perform(newCurl);
×
132
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
133
    (*ppHandle)->curl = newCurl;
×
134
    newCurl = NULL;
×
135
  }
136

137
  if ((*ppHandle)->url == NULL) {
×
138
    (*ppHandle)->url = taosStrdup(url);
×
139
    TSDB_CHECK_NULL((*ppHandle)->url, code, lino, _end, terrno);
×
140
  }
141

142
_end:
×
143
  if (code != TSDB_CODE_SUCCESS) {
×
144
    tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code));
×
145
    if (*ppHandle) {
×
146
      releaseStreamNotifyHandle(ppHandle);
×
147
    }
148
    *ppHandle = NULL;
×
149
  }
150
  if (newCurl) {
×
151
    curl_easy_cleanup(newCurl);
×
152
  }
153
  if (pNewHandle) {
×
154
    destroyStreamNotifyHandle(&pNewHandle);
×
155
  }
156
  if (gLocked) {
×
157
    (void)taosThreadMutexUnlock(&pMap->gMutex);
×
158
  }
159
  return code;
×
160
#else
161
  tqError("stream notify events is not supported on windows");
162
  return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
163
#endif
164
}
165

166
int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
11,512✔
167
  int32_t                 code = TSDB_CODE_SUCCESS;
11,512✔
168
  int32_t                 lino = 0;
11,512✔
169
  SStreamNotifyHandleMap* pMap = NULL;
11,512✔
170

171
  TSDB_CHECK_NULL(ppMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
11,512!
172

173
  *ppMap = NULL;
11,512✔
174
  pMap = taosMemoryCalloc(1, sizeof(SStreamNotifyHandleMap));
11,512!
175
  TSDB_CHECK_NULL(pMap, code, lino, _end, terrno);
11,674!
176
  code = taosThreadMutexInit(&pMap->gMutex, NULL);
11,674✔
177
  TSDB_CHECK_CODE(code, lino, _end);
11,675!
178
  pMap->handleMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
11,675✔
179
  TSDB_CHECK_NULL(pMap->handleMap, code, lino, _end, terrno);
11,669!
180
  taosHashSetFreeFp(pMap->handleMap, destroyStreamNotifyHandle);
11,669✔
181
  *ppMap = pMap;
11,670✔
182
  pMap = NULL;
11,670✔
183

184
_end:
11,670✔
185
  if (code != TSDB_CODE_SUCCESS) {
11,670!
186
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
187
  }
188
  if (pMap != NULL) {
11,671!
189
    tqDestroyNotifyHandleMap(&pMap);
×
190
  }
191
  return code;
11,671✔
192
}
193

194
void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
11,677✔
195
  int32_t code = TSDB_CODE_SUCCESS;
11,677✔
196
  int32_t lino = 0;
11,677✔
197

198
  if (*ppMap == NULL) {
11,677!
199
    return;
×
200
  }
201
  taosHashCleanup((*ppMap)->handleMap);
11,677✔
202
  code = taosThreadMutexDestroy(&(*ppMap)->gMutex);
11,677✔
203
  taosMemoryFreeClear((*ppMap));
11,677!
204
}
205

206
#define JSON_CHECK_ADD_ITEM(obj, str, item) \
207
  TSDB_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
208

209
static int32_t getStreamNotifyEventHeader(const char* streamName, char** pHeader) {
×
210
  int32_t code = TSDB_CODE_SUCCESS;
×
211
  int32_t lino = 0;
×
212
  cJSON*  obj = NULL;
×
213
  cJSON*  streams = NULL;
×
214
  cJSON*  stream = NULL;
×
215
  char    msgId[37];
216

217
  TSDB_CHECK_NULL(streamName, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
218
  TSDB_CHECK_NULL(pHeader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
219

220
  *pHeader = NULL;
×
221

222
  code = taosGetSystemUUIDLimit36(msgId, sizeof(msgId));
×
223
  TSDB_CHECK_CODE(code, lino, _end);
×
224

225
  stream = cJSON_CreateObject();
×
226
  TSDB_CHECK_NULL(stream, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
227
  JSON_CHECK_ADD_ITEM(stream, "streamName", cJSON_CreateStringReference(streamName));
×
228
  JSON_CHECK_ADD_ITEM(stream, "events", cJSON_CreateArray());
×
229

230
  streams = cJSON_CreateArray();
×
231
  TSDB_CHECK_CONDITION(cJSON_AddItemToArray(streams, stream), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
×
232
  stream = NULL;
×
233

234
  obj = cJSON_CreateObject();
×
235
  TSDB_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
236
  JSON_CHECK_ADD_ITEM(obj, "messageId", cJSON_CreateStringReference(msgId));
×
237
  JSON_CHECK_ADD_ITEM(obj, "timestamp", cJSON_CreateNumber(taosGetTimestampMs()));
×
238
  JSON_CHECK_ADD_ITEM(obj, "streams", streams);
×
239
  streams = NULL;
×
240

241
  *pHeader = cJSON_PrintUnformatted(obj);
×
242
  TSDB_CHECK_NULL(*pHeader, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
243

244
_end:
×
245
  if (code != TSDB_CODE_SUCCESS) {
×
246
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
247
  }
248
  if (stream != NULL) {
×
249
    cJSON_Delete(stream);
×
250
  }
251
  if (streams != NULL) {
×
252
    cJSON_Delete(streams);
×
253
  }
254
  if (obj != NULL) {
×
255
    cJSON_Delete(obj);
×
256
  }
257
  return code;
×
258
}
259

260
static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBlocks, char** pMsg,
×
261
                                       int32_t* nNotifyEvents) {
262
  int32_t     code = TSDB_CODE_SUCCESS;
×
263
  int32_t     lino = 0;
×
264
  int32_t     numOfBlocks = 0;
×
265
  int32_t     msgHeaderLen = 0;
×
266
  int32_t     msgTailLen = 0;
×
267
  int32_t     msgLen = 0;
×
268
  char*       msgHeader = NULL;
×
269
  const char* msgTail = "]}]}";
×
270
  char*       msg = NULL;
×
271

272
  TSDB_CHECK_NULL(pMsg, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
273

274
  *pMsg = NULL;
×
275
  numOfBlocks = taosArrayGetSize(pBlocks);
×
276
  *nNotifyEvents = 0;
×
277

278
  for (int32_t i = 0; i < numOfBlocks; ++i) {
×
279
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
×
280
    if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
×
281
      continue;
×
282
    }
283

284
    SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
×
285
    for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
×
286
      char* val = colDataGetVarData(pEventStrCol, j);
×
287
      msgLen += varDataLen(val) + 1;
×
288
    }
289
    *nNotifyEvents += pDataBlock->info.rows;
×
290
  }
291

292
  if (msgLen == 0) {
×
293
    // skip since no notification events found
294
    goto _end;
×
295
  }
296

297
  code = getStreamNotifyEventHeader(streamName, &msgHeader);
×
298
  TSDB_CHECK_CODE(code, lino, _end);
×
299
  msgHeaderLen = strlen(msgHeader);
×
300
  msgTailLen = strlen(msgTail);
×
301
  msgLen += msgHeaderLen;
×
302

303
  msg = taosMemoryMalloc(msgLen);
×
304
  TSDB_CHECK_NULL(msg, code, lino, _end, terrno);
×
305
  char* p = msg;
×
306
  TAOS_STRNCPY(p, msgHeader, msgHeaderLen);
×
307
  p += msgHeaderLen - msgTailLen;
×
308

309
  for (int32_t i = 0; i < numOfBlocks; ++i) {
×
310
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
×
311
    if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
×
312
      continue;
×
313
    }
314

315
    SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
×
316
    for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
×
317
      char* val = colDataGetVarData(pEventStrCol, j);
×
318
      TAOS_STRNCPY(p, varDataVal(val), varDataLen(val));
×
319
      p += varDataLen(val);
×
320
      *(p++) = ',';
×
321
    }
322
  }
323

324
  p -= 1;
×
325
  TAOS_STRNCPY(p, msgTail, msgTailLen);
×
326
  *(p + msgTailLen) = '\0';
×
327

328
  *pMsg = msg;
×
329
  msg = NULL;
×
330

331
_end:
×
332
  if (code != TSDB_CODE_SUCCESS) {
×
333
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
334
  }
335
  if (msgHeader != NULL) {
×
336
    cJSON_free(msgHeader);
×
337
  }
338
  if (msg != NULL) {
×
339
    taosMemoryFreeClear(msg);
×
340
  }
341
  return code;
×
342
}
343

344
static int32_t sendSingleStreamNotify(SStreamNotifyHandle* pHandle, char* msg) {
×
345
#ifndef WINDOWS
346
  int32_t  code = TSDB_CODE_SUCCESS;
×
347
  int32_t  lino = 0;
×
348
  CURLcode res = CURLE_OK;
×
349
  uint64_t sentLen = 0;
×
350
  uint64_t totalLen = 0;
×
351
  size_t   nbytes = 0;
×
352

353
  TSDB_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
354
  TSDB_CHECK_NULL(pHandle->curl, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
355

356
  totalLen = strlen(msg);
×
357
  while (sentLen < totalLen) {
×
358
    res = curl_ws_send(pHandle->curl, msg + sentLen, totalLen - sentLen, &nbytes, 0, CURLWS_TEXT);
×
359
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
360
    sentLen += nbytes;
×
361
  }
362

363
_end:
×
364
  if (code != TSDB_CODE_SUCCESS) {
×
365
    tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code));
×
366
    stopStreamNotifyConn(pHandle);
×
367
  }
368
  return code;
×
369
#else
370
  tqError("stream notify events is not supported on windows");
371
  return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
372
#endif
373
}
374

375
int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode) {
15,957✔
376
  int32_t              code = TSDB_CODE_SUCCESS;
15,957✔
377
  int32_t              lino = 0;
15,957✔
378
  char*                msg = NULL;
15,957✔
379
  int32_t              nNotifyAddr = 0;
15,957✔
380
  int32_t              nNotifyEvents = 0;
15,957✔
381
  SStreamNotifyHandle* pHandle = NULL;
15,957✔
382

383
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
15,957!
384
  TSDB_CHECK_NULL(pVnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
15,957!
385

386
  nNotifyAddr = taosArrayGetSize(pTask->notifyInfo.pNotifyAddrUrls);
15,957✔
387
  if (nNotifyAddr == 0) {
15,961!
388
    goto _end;
15,961✔
389
  }
390

391
  code = packupStreamNotifyEvent(pTask->notifyInfo.streamName, pBlocks, &msg, &nNotifyEvents);
×
392
  TSDB_CHECK_CODE(code, lino, _end);
×
393
  if (msg == NULL) {
×
394
    goto _end;
×
395
  }
396

397
  tqDebug("stream task %s prepare to send %d notify events, total msg length: %" PRIu64, pTask->notifyInfo.streamName,
×
398
          nNotifyEvents, (uint64_t)strlen(msg));
399

400
  for (int32_t i = 0; i < nNotifyAddr; ++i) {
×
401
    if (streamTaskShouldStop(pTask)) {
×
402
      break;
×
403
    }
404
    const char* url = taosArrayGetP(pTask->notifyInfo.pNotifyAddrUrls, i);
×
405
    code = acquireStreamNotifyHandle(pVnode->pNotifyHandleMap, url, &pHandle);
×
406
    if (code != TSDB_CODE_SUCCESS) {
×
407
      tqError("failed to get stream notify handle of %s", url);
×
408
      if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
×
409
        // retry for event message sending in PAUSE error handling mode
410
        taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS);
×
411
        --i;
×
412
        continue;
×
413
      } else {
414
        // simply ignore the failure in DROP error handling mode
415
        code = TSDB_CODE_SUCCESS;
×
416
        continue;
×
417
      }
418
    }
419
    code = sendSingleStreamNotify(pHandle, msg);
×
420
    if (code != TSDB_CODE_SUCCESS) {
×
421
      tqError("failed to send stream notify handle to %s since %s", url, tstrerror(code));
×
422
      if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
×
423
        // retry for event message sending in PAUSE error handling mode
424
        taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS);
×
425
        --i;
×
426
      } else {
427
        // simply ignore the failure in DROP error handling mode
428
        code = TSDB_CODE_SUCCESS;
×
429
      }
430
    } else {
431
      tqDebug("stream task %s send %d notify events to %s successfully", pTask->notifyInfo.streamName, nNotifyEvents,
×
432
              url);
433
    }
434
    releaseStreamNotifyHandle(&pHandle);
×
435
  }
436

437
_end:
×
438
  if (code != TSDB_CODE_SUCCESS) {
15,961!
439
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
440
  }
441
  if (msg) {
15,960!
442
    taosMemoryFreeClear(msg);
×
443
  }
444
  return code;
15,960✔
445
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc