• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3615

18 Feb 2025 07:41AM UTC coverage: 62.953% (+1.6%) from 61.4%
#3615

push

travis-ci

web-flow
Merge pull request #29812 from taosdata/doc/analysis

doc: update tdgpt doc.

146885 of 299602 branches covered (49.03%)

Branch coverage included in aggregate %.

230802 of 300346 relevant lines covered (76.85%)

17263824.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.7
/source/dnode/vnode/src/tq/tqStreamNotify.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cmdnodes.h"
17
#include "tq.h"
18

19
#ifndef WINDOWS
20
#include "curl/curl.h"
21
#endif
22

23
#define STREAM_EVENT_NOTIFY_RETRY_MS         50          // 50 ms
24
#define STREAM_EVENT_NOTIFY_MESSAAGE_SIZE_KB 8 * 1024    // 8 MB
25
#define STREAM_EVENT_NOTIFY_FRAME_SIZE       256 * 1024  // 256 KB
26

27
typedef struct SStreamNotifyHandle {
28
  TdThreadMutex mutex;
29
#ifndef WINDOWS
30
  CURL* curl;
31
#endif
32
  char* url;
33
} SStreamNotifyHandle;
34

35
struct SStreamNotifyHandleMap {
36
  TdThreadMutex gMutex;
37
  SHashObj*     handleMap;
38
};
39

40
static void stopStreamNotifyConn(SStreamNotifyHandle* pHandle) {
×
41
#ifndef WINDOWS
42
  if (pHandle == NULL || pHandle->curl == NULL) {
×
43
    return;
×
44
  }
45
  // status code 1000 means normal closure
46
  size_t   len = 0;
×
47
  uint16_t status = htons(1000);
×
48
  CURLcode res = curl_ws_send(pHandle->curl, &status, sizeof(status), &len, 0, CURLWS_CLOSE);
×
49
  if (res != CURLE_OK) {
×
50
    tqWarn("failed to send ws-close msg to %s for %d", pHandle->url ? pHandle->url : "", res);
×
51
  }
52
  // TODO: add wait mechanism for peer connection close response
53
  curl_easy_cleanup(pHandle->curl);
×
54
  pHandle->curl = NULL;
×
55
#endif
56
}
57

58
static void destroyStreamNotifyHandle(void* ptr) {
×
59
  int32_t               code = TSDB_CODE_SUCCESS;
×
60
  int32_t               lino = 0;
×
61
  SStreamNotifyHandle** ppHandle = ptr;
×
62

63
  if (ppHandle == NULL || *ppHandle == NULL) {
×
64
    return;
×
65
  }
66
  code = taosThreadMutexDestroy(&(*ppHandle)->mutex);
×
67
  stopStreamNotifyConn(*ppHandle);
×
68
  taosMemoryFreeClear((*ppHandle)->url);
×
69
  taosMemoryFreeClear(*ppHandle);
×
70
}
71

72
static void releaseStreamNotifyHandle(SStreamNotifyHandle** ppHandle) {
×
73
  if (ppHandle == NULL || *ppHandle == NULL) {
×
74
    return;
×
75
  }
76
  (void)taosThreadMutexUnlock(&(*ppHandle)->mutex);
×
77
  *ppHandle = NULL;
×
78
}
79

80
static int32_t acquireStreamNotifyHandle(SStreamNotifyHandleMap* pMap, const char* url,
×
81
                                         SStreamNotifyHandle** ppHandle) {
82
#ifndef WINDOWS
83
  int32_t               code = TSDB_CODE_SUCCESS;
×
84
  int32_t               lino = 0;
×
85
  bool                  gLocked = false;
×
86
  SStreamNotifyHandle** ppFindHandle = NULL;
×
87
  SStreamNotifyHandle*  pNewHandle = NULL;
×
88
  CURL*                 newCurl = NULL;
×
89
  CURLcode              res = CURLE_OK;
×
90

91
  TSDB_CHECK_NULL(pMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
92
  TSDB_CHECK_NULL(url, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
93
  TSDB_CHECK_NULL(ppHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
94

95
  *ppHandle = NULL;
×
96

97
  code = taosThreadMutexLock(&pMap->gMutex);
×
98
  TSDB_CHECK_CODE(code, lino, _end);
×
99
  gLocked = true;
×
100

101
  ppFindHandle = taosHashGet(pMap->handleMap, url, strlen(url));
×
102
  if (ppFindHandle == NULL) {
×
103
    pNewHandle = taosMemoryCalloc(1, sizeof(SStreamNotifyHandle));
×
104
    TSDB_CHECK_NULL(pNewHandle, code, lino, _end, terrno);
×
105
    code = taosThreadMutexInit(&pNewHandle->mutex, NULL);
×
106
    TSDB_CHECK_CODE(code, lino, _end);
×
107
    code = taosHashPut(pMap->handleMap, url, strlen(url), &pNewHandle, POINTER_BYTES);
×
108
    TSDB_CHECK_CODE(code, lino, _end);
×
109
    *ppHandle = pNewHandle;
×
110
    pNewHandle = NULL;
×
111
  } else {
112
    *ppHandle = *ppFindHandle;
×
113
  }
114

115
  code = taosThreadMutexLock(&(*ppHandle)->mutex);
×
116
  TSDB_CHECK_CODE(code, lino, _end);
×
117

118
  (void)taosThreadMutexUnlock(&pMap->gMutex);
×
119
  gLocked = false;
×
120

121
  if ((*ppHandle)->curl == NULL) {
×
122
    newCurl = curl_easy_init();
×
123
    TSDB_CHECK_NULL(newCurl, code, lino, _end, TSDB_CODE_FAILED);
×
124
    res = curl_easy_setopt(newCurl, CURLOPT_URL, url);
×
125
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
126
    res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYPEER, 0L);
×
127
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
128
    res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYHOST, 0L);
×
129
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
130
    res = curl_easy_setopt(newCurl, CURLOPT_TIMEOUT, 3L);
×
131
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
132
    res = curl_easy_setopt(newCurl, CURLOPT_CONNECT_ONLY, 2L);
×
133
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
134
    res = curl_easy_perform(newCurl);
×
135
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
136
    (*ppHandle)->curl = newCurl;
×
137
    newCurl = NULL;
×
138
  }
139

140
  if ((*ppHandle)->url == NULL) {
×
141
    (*ppHandle)->url = taosStrdup(url);
×
142
    TSDB_CHECK_NULL((*ppHandle)->url, code, lino, _end, terrno);
×
143
  }
144

145
_end:
×
146
  if (code != TSDB_CODE_SUCCESS) {
×
147
    tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code));
×
148
    if (*ppHandle) {
×
149
      releaseStreamNotifyHandle(ppHandle);
×
150
    }
151
    *ppHandle = NULL;
×
152
  }
153
  if (newCurl) {
×
154
    curl_easy_cleanup(newCurl);
×
155
  }
156
  if (pNewHandle) {
×
157
    destroyStreamNotifyHandle(&pNewHandle);
×
158
  }
159
  if (gLocked) {
×
160
    (void)taosThreadMutexUnlock(&pMap->gMutex);
×
161
  }
162
  return code;
×
163
#else
164
  tqError("stream notify events is not supported on windows");
165
  return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
166
#endif
167
}
168

169
int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
10,942✔
170
  int32_t                 code = TSDB_CODE_SUCCESS;
10,942✔
171
  int32_t                 lino = 0;
10,942✔
172
  SStreamNotifyHandleMap* pMap = NULL;
10,942✔
173

174
  TSDB_CHECK_NULL(ppMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
10,942!
175

176
  *ppMap = NULL;
10,942✔
177
  pMap = taosMemoryCalloc(1, sizeof(SStreamNotifyHandleMap));
10,942!
178
  TSDB_CHECK_NULL(pMap, code, lino, _end, terrno);
11,075!
179
  code = taosThreadMutexInit(&pMap->gMutex, NULL);
11,075✔
180
  TSDB_CHECK_CODE(code, lino, _end);
11,075!
181
  pMap->handleMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
11,075✔
182
  TSDB_CHECK_NULL(pMap->handleMap, code, lino, _end, terrno);
11,071!
183
  taosHashSetFreeFp(pMap->handleMap, destroyStreamNotifyHandle);
11,071✔
184
  *ppMap = pMap;
11,071✔
185
  pMap = NULL;
11,071✔
186

187
_end:
11,071✔
188
  if (code != TSDB_CODE_SUCCESS) {
11,071!
189
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
190
  }
191
  if (pMap != NULL) {
11,072!
192
    tqDestroyNotifyHandleMap(&pMap);
×
193
  }
194
  return code;
11,071✔
195
}
196

197
void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
11,077✔
198
  int32_t code = TSDB_CODE_SUCCESS;
11,077✔
199
  int32_t lino = 0;
11,077✔
200

201
  if (*ppMap == NULL) {
11,077!
202
    return;
×
203
  }
204
  taosHashCleanup((*ppMap)->handleMap);
11,077✔
205
  code = taosThreadMutexDestroy(&(*ppMap)->gMutex);
11,077✔
206
  taosMemoryFreeClear((*ppMap));
11,077!
207
}
208

209
#define JSON_CHECK_ADD_ITEM(obj, str, item) \
210
  TSDB_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
211

212
static int32_t getStreamNotifyEventHeader(const char* streamName, char** pHeader) {
×
213
  int32_t code = TSDB_CODE_SUCCESS;
×
214
  int32_t lino = 0;
×
215
  cJSON*  obj = NULL;
×
216
  cJSON*  streams = NULL;
×
217
  cJSON*  stream = NULL;
×
218
  char    msgId[37];
219

220
  TSDB_CHECK_NULL(streamName, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
221
  TSDB_CHECK_NULL(pHeader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
222

223
  *pHeader = NULL;
×
224

225
  code = taosGetSystemUUIDLimit36(msgId, sizeof(msgId));
×
226
  TSDB_CHECK_CODE(code, lino, _end);
×
227

228
  stream = cJSON_CreateObject();
×
229
  TSDB_CHECK_NULL(stream, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
230
  JSON_CHECK_ADD_ITEM(stream, "streamName", cJSON_CreateStringReference(streamName));
×
231
  JSON_CHECK_ADD_ITEM(stream, "events", cJSON_CreateArray());
×
232

233
  streams = cJSON_CreateArray();
×
234
  TSDB_CHECK_CONDITION(cJSON_AddItemToArray(streams, stream), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
×
235
  stream = NULL;
×
236

237
  obj = cJSON_CreateObject();
×
238
  TSDB_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
239
  JSON_CHECK_ADD_ITEM(obj, "messageId", cJSON_CreateStringReference(msgId));
×
240
  JSON_CHECK_ADD_ITEM(obj, "timestamp", cJSON_CreateNumber(taosGetTimestampMs()));
×
241
  JSON_CHECK_ADD_ITEM(obj, "streams", streams);
×
242
  streams = NULL;
×
243

244
  *pHeader = cJSON_PrintUnformatted(obj);
×
245
  TSDB_CHECK_NULL(*pHeader, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
246

247
_end:
×
248
  if (code != TSDB_CODE_SUCCESS) {
×
249
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
250
  }
251
  if (stream != NULL) {
×
252
    cJSON_Delete(stream);
×
253
  }
254
  if (streams != NULL) {
×
255
    cJSON_Delete(streams);
×
256
  }
257
  if (obj != NULL) {
×
258
    cJSON_Delete(obj);
×
259
  }
260
  return code;
×
261
}
262

263
static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBlocks, char** pMsg,
×
264
                                       int32_t* nNotifyEvents, STaskNotifyEventStat* pNotifyEventStat,
265
                                       int32_t* pBlockIdx) {
266
  int32_t     code = TSDB_CODE_SUCCESS;
×
267
  int32_t     lino = 0;
×
268
  int32_t     numOfBlocks = 0;
×
269
  int32_t     msgHeaderLen = 0;
×
270
  int32_t     msgTailLen = 0;
×
271
  int32_t     msgLen = 0;
×
272
  char*       msgHeader = NULL;
×
273
  const char* msgTail = "]}]}";
×
274
  char*       msg = NULL;
×
275
  int64_t     startTime = 0;
×
276
  int64_t     endTime = 0;
×
277
  int32_t     nBlocks = 0;
×
278

279
  TSDB_CHECK_NULL(pMsg, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
280
  TSDB_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
281

282
  *pMsg = NULL;
×
283
  numOfBlocks = taosArrayGetSize(pBlocks);
×
284
  *nNotifyEvents = 0;
×
285

286
  for (int32_t i = *pBlockIdx; i < numOfBlocks; ++i) {
×
287
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
×
288
    nBlocks++;
×
289
    if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
×
290
      continue;
×
291
    }
292

293
    SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
×
294
    for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
×
295
      char* val = colDataGetVarData(pEventStrCol, j);
×
296
      msgLen += varDataLen(val) + 1;
×
297
    }
298
    *nNotifyEvents += pDataBlock->info.rows;
×
299
    if (msgLen >= STREAM_EVENT_NOTIFY_MESSAAGE_SIZE_KB * 1024) {
×
300
      break;
×
301
    }
302
  }
303

304
  *pBlockIdx += nBlocks;
×
305

306
  if (msgLen == 0) {
×
307
    // skip since no notification events found
308
    goto _end;
×
309
  }
310

311
  startTime = taosGetMonoTimestampMs();
×
312
  code = getStreamNotifyEventHeader(streamName, &msgHeader);
×
313
  TSDB_CHECK_CODE(code, lino, _end);
×
314
  msgHeaderLen = strlen(msgHeader);
×
315
  msgTailLen = strlen(msgTail);
×
316
  msgLen += msgHeaderLen;
×
317

318
  msg = taosMemoryMalloc(msgLen);
×
319
  TSDB_CHECK_NULL(msg, code, lino, _end, terrno);
×
320
  char* p = msg;
×
321
  TAOS_STRNCPY(p, msgHeader, msgHeaderLen);
×
322
  p += msgHeaderLen - msgTailLen;
×
323

324
  for (int32_t i = *pBlockIdx - nBlocks; i < *pBlockIdx; ++i) {
×
325
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
×
326
    if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
×
327
      continue;
×
328
    }
329

330
    SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
×
331
    for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
×
332
      char* val = colDataGetVarData(pEventStrCol, j);
×
333
      TAOS_STRNCPY(p, varDataVal(val), varDataLen(val));
×
334
      p += varDataLen(val);
×
335
      *(p++) = ',';
×
336
    }
337
  }
338

339
  p -= 1;
×
340
  TAOS_STRNCPY(p, msgTail, msgTailLen);
×
341
  *(p + msgTailLen) = '\0';
×
342

343
  *pMsg = msg;
×
344
  msg = NULL;
×
345

346
  endTime = taosGetMonoTimestampMs();
×
347
  pNotifyEventStat->notifyEventPackTimes++;
×
348
  pNotifyEventStat->notifyEventPackElems += *nNotifyEvents;
×
349
  pNotifyEventStat->notifyEventPackCostSec += (endTime - startTime) / 1000.0;
×
350

351
_end:
×
352
  if (code != TSDB_CODE_SUCCESS) {
×
353
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
354
  }
355
  if (msgHeader != NULL) {
×
356
    cJSON_free(msgHeader);
×
357
  }
358
  if (msg != NULL) {
×
359
    taosMemoryFreeClear(msg);
×
360
  }
361
  return code;
×
362
}
363

364
static int32_t sendSingleStreamNotify(SStreamNotifyHandle* pHandle, char* msg) {
×
365
#ifndef WINDOWS
366
  int32_t  code = TSDB_CODE_SUCCESS;
×
367
  int32_t  lino = 0;
×
368
  CURLcode res = CURLE_OK;
×
369
  uint64_t sentLen = 0;
×
370
  uint64_t totalLen = 0;
×
371
  size_t   nbytes = 0;
×
372

373
  TSDB_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
374
  TSDB_CHECK_NULL(pHandle->curl, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
375

376
  totalLen = strlen(msg);
×
377
  if (totalLen > 0) {
×
378
    // send PING frame to check if the connection is still alive
379
    res = curl_ws_send(pHandle->curl, "", 0, (size_t*)&sentLen, 0, CURLWS_PING);
×
380
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
381
  }
382
  sentLen = 0;
×
383
  while (sentLen < totalLen) {
×
384
    size_t chunkSize = TMIN(totalLen - sentLen, STREAM_EVENT_NOTIFY_FRAME_SIZE);
×
385
    if (sentLen == 0) {
×
386
      res = curl_ws_send(pHandle->curl, msg, chunkSize, &nbytes, totalLen, CURLWS_TEXT | CURLWS_OFFSET);
×
387
      TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
388
    } else {
389
      res = curl_ws_send(pHandle->curl, msg + sentLen, chunkSize, &nbytes, 0, CURLWS_TEXT | CURLWS_OFFSET);
×
390
      TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
391
    }
392
    sentLen += nbytes;
×
393
  }
394

395
_end:
×
396
  if (code != TSDB_CODE_SUCCESS) {
×
397
    tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code));
×
398
    stopStreamNotifyConn(pHandle);
×
399
  }
400
  return code;
×
401
#else
402
  tqError("stream notify events is not supported on windows");
403
  return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
404
#endif
405
}
406

407
int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode) {
15,848✔
408
  int32_t              code = TSDB_CODE_SUCCESS;
15,848✔
409
  int32_t              lino = 0;
15,848✔
410
  char*                msg = NULL;
15,848✔
411
  int32_t              nNotifyAddr = 0;
15,848✔
412
  int32_t              nNotifyEvents = 0;
15,848✔
413
  SStreamNotifyHandle* pHandle = NULL;
15,848✔
414
  int64_t              startTime = 0;
15,848✔
415
  int64_t              endTime = 0;
15,848✔
416
  int32_t              blockIdx = 0;
15,848✔
417

418
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
15,848!
419
  TSDB_CHECK_NULL(pVnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
15,848!
420

421
  nNotifyAddr = taosArrayGetSize(pTask->notifyInfo.pNotifyAddrUrls);
15,848✔
422
  if (nNotifyAddr == 0) {
15,850!
423
    goto _end;
15,850✔
424
  }
425

426
  while (blockIdx < taosArrayGetSize(pBlocks)) {
×
427
    code = packupStreamNotifyEvent(pTask->notifyInfo.streamName, pBlocks, &msg, &nNotifyEvents, &pTask->notifyEventStat,
×
428
                                   &blockIdx);
429
    TSDB_CHECK_CODE(code, lino, _end);
×
430
    if (msg == NULL) {
×
431
      continue;
×
432
    }
433

434
    tqDebug("stream task %s prepare to send %d notify events, total msg length: %" PRIu64, pTask->notifyInfo.streamName,
×
435
            nNotifyEvents, (uint64_t)strlen(msg));
436

437
    startTime = taosGetMonoTimestampMs();
×
438
    for (int32_t i = 0; i < nNotifyAddr; ++i) {
×
439
      if (streamTaskShouldStop(pTask)) {
×
440
        break;
×
441
      }
442
      const char* url = taosArrayGetP(pTask->notifyInfo.pNotifyAddrUrls, i);
×
443
      code = acquireStreamNotifyHandle(pVnode->pNotifyHandleMap, url, &pHandle);
×
444
      if (code != TSDB_CODE_SUCCESS) {
×
445
        tqError("failed to get stream notify handle of %s", url);
×
446
        if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
×
447
          // retry for event message sending in PAUSE error handling mode
448
          taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS);
×
449
          --i;
×
450
          continue;
×
451
        } else {
452
          // simply ignore the failure in DROP error handling mode
453
          code = TSDB_CODE_SUCCESS;
×
454
          continue;
×
455
        }
456
      }
457
      code = sendSingleStreamNotify(pHandle, msg);
×
458
      if (code != TSDB_CODE_SUCCESS) {
×
459
        tqError("failed to send stream notify handle to %s since %s", url, tstrerror(code));
×
460
        if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
×
461
          // retry for event message sending in PAUSE error handling mode
462
          taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS);
×
463
          --i;
×
464
        } else {
465
          // simply ignore the failure in DROP error handling mode
466
          code = TSDB_CODE_SUCCESS;
×
467
        }
468
      } else {
469
        tqDebug("stream task %s send %d notify events to %s successfully", pTask->notifyInfo.streamName, nNotifyEvents,
×
470
                url);
471
      }
472
      releaseStreamNotifyHandle(&pHandle);
×
473
    }
474

475
    endTime = taosGetMonoTimestampMs();
×
476
    pTask->notifyEventStat.notifyEventSendTimes++;
×
477
    pTask->notifyEventStat.notifyEventSendElems += nNotifyEvents;
×
478
    pTask->notifyEventStat.notifyEventSendCostSec += (endTime - startTime) / 1000.0;
×
479

480
    taosMemoryFreeClear(msg);
×
481
  }
482

483
_end:
×
484
  if (code != TSDB_CODE_SUCCESS) {
15,850!
485
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
486
  }
487
  if (msg) {
15,850!
488
    taosMemoryFreeClear(msg);
×
489
  }
490
  return code;
15,850✔
491
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc