• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3798

31 Mar 2025 10:39AM UTC coverage: 9.424% (-20.9%) from 30.372%
#3798

push

travis-ci

happyguoxy
test:add test cases

21549 of 307601 branches covered (7.01%)

Branch coverage included in aggregate %.

36084 of 303967 relevant lines covered (11.87%)

58620.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tq/tqStreamNotify.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cmdnodes.h"
17
#include "tq.h"
18

19
#ifndef WINDOWS
20
#include "curl/curl.h"
21
#endif
22

23
#define STREAM_EVENT_NOTIFY_RETRY_MS         50          // 50 ms
24
typedef struct SStreamNotifyHandle {
25
  TdThreadMutex mutex;
26
#ifndef WINDOWS
27
  CURL* curl;
28
#endif
29
  char* url;
30
} SStreamNotifyHandle;
31

32
struct SStreamNotifyHandleMap {
33
  TdThreadMutex gMutex;
34
  SHashObj*     handleMap;
35
};
36

37
static void stopStreamNotifyConn(SStreamNotifyHandle* pHandle) {
×
38
#ifndef WINDOWS
39
  if (pHandle == NULL || pHandle->curl == NULL) {
×
40
    return;
×
41
  }
42
  // status code 1000 means normal closure
43
  size_t   len = 0;
×
44
  uint16_t status = htons(1000);
×
45
  CURLcode res = curl_ws_send(pHandle->curl, &status, sizeof(status), &len, 0, CURLWS_CLOSE);
×
46
  if (res != CURLE_OK) {
×
47
    tqWarn("failed to send ws-close msg to %s for %d", pHandle->url ? pHandle->url : "", res);
×
48
  }
49
  // TODO: add wait mechanism for peer connection close response
50
  curl_easy_cleanup(pHandle->curl);
×
51
  pHandle->curl = NULL;
×
52
#endif
53
}
54

55
static void destroyStreamNotifyHandle(void* ptr) {
×
56
  int32_t               code = TSDB_CODE_SUCCESS;
×
57
  int32_t               lino = 0;
×
58
  SStreamNotifyHandle** ppHandle = ptr;
×
59

60
  if (ppHandle == NULL || *ppHandle == NULL) {
×
61
    return;
×
62
  }
63
  code = taosThreadMutexDestroy(&(*ppHandle)->mutex);
×
64
  stopStreamNotifyConn(*ppHandle);
×
65
  taosMemoryFreeClear((*ppHandle)->url);
×
66
  taosMemoryFreeClear(*ppHandle);
×
67
}
68

69
static void releaseStreamNotifyHandle(SStreamNotifyHandle** ppHandle) {
×
70
  if (ppHandle == NULL || *ppHandle == NULL) {
×
71
    return;
×
72
  }
73
  (void)taosThreadMutexUnlock(&(*ppHandle)->mutex);
×
74
  *ppHandle = NULL;
×
75
}
76

77
static int32_t acquireStreamNotifyHandle(SStreamNotifyHandleMap* pMap, const char* url,
×
78
                                         SStreamNotifyHandle** ppHandle) {
79
#ifndef WINDOWS
80
  int32_t               code = TSDB_CODE_SUCCESS;
×
81
  int32_t               lino = 0;
×
82
  bool                  gLocked = false;
×
83
  SStreamNotifyHandle** ppFindHandle = NULL;
×
84
  SStreamNotifyHandle*  pNewHandle = NULL;
×
85
  CURL*                 newCurl = NULL;
×
86
  CURLcode              res = CURLE_OK;
×
87

88
  TSDB_CHECK_NULL(pMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
89
  TSDB_CHECK_NULL(url, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
90
  TSDB_CHECK_NULL(ppHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
91

92
  *ppHandle = NULL;
×
93

94
  code = taosThreadMutexLock(&pMap->gMutex);
×
95
  TSDB_CHECK_CODE(code, lino, _end);
×
96
  gLocked = true;
×
97

98
  ppFindHandle = taosHashGet(pMap->handleMap, url, strlen(url));
×
99
  if (ppFindHandle == NULL) {
×
100
    pNewHandle = taosMemoryCalloc(1, sizeof(SStreamNotifyHandle));
×
101
    TSDB_CHECK_NULL(pNewHandle, code, lino, _end, terrno);
×
102
    code = taosThreadMutexInit(&pNewHandle->mutex, NULL);
×
103
    TSDB_CHECK_CODE(code, lino, _end);
×
104
    code = taosHashPut(pMap->handleMap, url, strlen(url), &pNewHandle, POINTER_BYTES);
×
105
    TSDB_CHECK_CODE(code, lino, _end);
×
106
    *ppHandle = pNewHandle;
×
107
    pNewHandle = NULL;
×
108
  } else {
109
    *ppHandle = *ppFindHandle;
×
110
  }
111

112
  code = taosThreadMutexLock(&(*ppHandle)->mutex);
×
113
  TSDB_CHECK_CODE(code, lino, _end);
×
114

115
  (void)taosThreadMutexUnlock(&pMap->gMutex);
×
116
  gLocked = false;
×
117

118
  if ((*ppHandle)->curl == NULL) {
×
119
    newCurl = curl_easy_init();
×
120
    TSDB_CHECK_NULL(newCurl, code, lino, _end, TSDB_CODE_FAILED);
×
121
    res = curl_easy_setopt(newCurl, CURLOPT_URL, url);
×
122
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
123
    res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYPEER, 0L);
×
124
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
125
    res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYHOST, 0L);
×
126
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
127
    res = curl_easy_setopt(newCurl, CURLOPT_TIMEOUT, 3L);
×
128
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
129
    res = curl_easy_setopt(newCurl, CURLOPT_CONNECT_ONLY, 2L);
×
130
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
131
    res = curl_easy_perform(newCurl);
×
132
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
133
    (*ppHandle)->curl = newCurl;
×
134
    newCurl = NULL;
×
135
  }
136

137
  if ((*ppHandle)->url == NULL) {
×
138
    (*ppHandle)->url = taosStrdup(url);
×
139
    TSDB_CHECK_NULL((*ppHandle)->url, code, lino, _end, terrno);
×
140
  }
141

142
_end:
×
143
  if (code != TSDB_CODE_SUCCESS) {
×
144
    tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code));
×
145
    if (*ppHandle) {
×
146
      releaseStreamNotifyHandle(ppHandle);
×
147
    }
148
    *ppHandle = NULL;
×
149
  }
150
  if (newCurl) {
×
151
    curl_easy_cleanup(newCurl);
×
152
  }
153
  if (pNewHandle) {
×
154
    destroyStreamNotifyHandle(&pNewHandle);
×
155
  }
156
  if (gLocked) {
×
157
    (void)taosThreadMutexUnlock(&pMap->gMutex);
×
158
  }
159
  return code;
×
160
#else
161
  tqError("stream notify events is not supported on windows");
162
  return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
163
#endif
164
}
165

166
int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
×
167
  int32_t                 code = TSDB_CODE_SUCCESS;
×
168
  int32_t                 lino = 0;
×
169
  SStreamNotifyHandleMap* pMap = NULL;
×
170

171
  TSDB_CHECK_NULL(ppMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
172

173
  *ppMap = NULL;
×
174
  pMap = taosMemoryCalloc(1, sizeof(SStreamNotifyHandleMap));
×
175
  TSDB_CHECK_NULL(pMap, code, lino, _end, terrno);
×
176
  code = taosThreadMutexInit(&pMap->gMutex, NULL);
×
177
  TSDB_CHECK_CODE(code, lino, _end);
×
178
  pMap->handleMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
×
179
  TSDB_CHECK_NULL(pMap->handleMap, code, lino, _end, terrno);
×
180
  taosHashSetFreeFp(pMap->handleMap, destroyStreamNotifyHandle);
×
181
  *ppMap = pMap;
×
182
  pMap = NULL;
×
183

184
_end:
×
185
  if (code != TSDB_CODE_SUCCESS) {
×
186
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
187
  }
188
  if (pMap != NULL) {
×
189
    tqDestroyNotifyHandleMap(&pMap);
×
190
  }
191
  return code;
×
192
}
193

194
void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
×
195
  int32_t code = TSDB_CODE_SUCCESS;
×
196
  int32_t lino = 0;
×
197

198
  if (*ppMap == NULL) {
×
199
    return;
×
200
  }
201
  taosHashCleanup((*ppMap)->handleMap);
×
202
  code = taosThreadMutexDestroy(&(*ppMap)->gMutex);
×
203
  taosMemoryFreeClear((*ppMap));
×
204
}
205

206
#define JSON_CHECK_ADD_ITEM(obj, str, item) \
207
  TSDB_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
208

209
static int32_t getStreamNotifyEventHeader(const char* streamName, char** pHeader) {
×
210
  int32_t code = TSDB_CODE_SUCCESS;
×
211
  int32_t lino = 0;
×
212
  cJSON*  obj = NULL;
×
213
  cJSON*  streams = NULL;
×
214
  cJSON*  stream = NULL;
×
215
  char    msgId[37];
216

217
  TSDB_CHECK_NULL(streamName, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
218
  TSDB_CHECK_NULL(pHeader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
219

220
  *pHeader = NULL;
×
221

222
  code = taosGetSystemUUIDLimit36(msgId, sizeof(msgId));
×
223
  TSDB_CHECK_CODE(code, lino, _end);
×
224

225
  stream = cJSON_CreateObject();
×
226
  TSDB_CHECK_NULL(stream, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
227
  JSON_CHECK_ADD_ITEM(stream, "streamName", cJSON_CreateStringReference(streamName));
×
228
  JSON_CHECK_ADD_ITEM(stream, "events", cJSON_CreateArray());
×
229

230
  streams = cJSON_CreateArray();
×
231
  TSDB_CHECK_CONDITION(cJSON_AddItemToArray(streams, stream), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
×
232
  stream = NULL;
×
233

234
  obj = cJSON_CreateObject();
×
235
  TSDB_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
236
  JSON_CHECK_ADD_ITEM(obj, "messageId", cJSON_CreateStringReference(msgId));
×
237
  JSON_CHECK_ADD_ITEM(obj, "timestamp", cJSON_CreateNumber(taosGetTimestampMs()));
×
238
  JSON_CHECK_ADD_ITEM(obj, "streams", streams);
×
239
  streams = NULL;
×
240

241
  *pHeader = cJSON_PrintUnformatted(obj);
×
242
  TSDB_CHECK_NULL(*pHeader, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
243

244
_end:
×
245
  if (code != TSDB_CODE_SUCCESS) {
×
246
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
247
  }
248
  if (stream != NULL) {
×
249
    cJSON_Delete(stream);
×
250
  }
251
  if (streams != NULL) {
×
252
    cJSON_Delete(streams);
×
253
  }
254
  if (obj != NULL) {
×
255
    cJSON_Delete(obj);
×
256
  }
257
  return code;
×
258
}
259

260
static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBlocks, char** pMsg,
×
261
                                       int32_t* nNotifyEvents, STaskNotifyEventStat* pNotifyEventStat,
262
                                       int32_t* pBlockIdx) {
263
  int32_t     code = TSDB_CODE_SUCCESS;
×
264
  int32_t     lino = 0;
×
265
  int32_t     numOfBlocks = 0;
×
266
  int32_t     msgHeaderLen = 0;
×
267
  int32_t     msgTailLen = 0;
×
268
  int32_t     msgLen = 0;
×
269
  char*       msgHeader = NULL;
×
270
  const char* msgTail = "]}]}";
×
271
  char*       msg = NULL;
×
272
  int64_t     startTime = 0;
×
273
  int64_t     endTime = 0;
×
274
  int32_t     nBlocks = 0;
×
275

276
  TSDB_CHECK_NULL(pMsg, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
277
  TSDB_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
278

279
  *pMsg = NULL;
×
280
  numOfBlocks = taosArrayGetSize(pBlocks);
×
281
  *nNotifyEvents = 0;
×
282

283
  for (int32_t i = *pBlockIdx; i < numOfBlocks; ++i) {
×
284
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
×
285
    nBlocks++;
×
286
    if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
×
287
      continue;
×
288
    }
289

290
    SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
×
291
    for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
×
292
      char* val = colDataGetVarData(pEventStrCol, j);
×
293
      msgLen += varDataLen(val) + 1;
×
294
    }
295
    *nNotifyEvents += pDataBlock->info.rows;
×
296
    if (msgLen >= tsStreamNotifyMessageSize * 1024) {
×
297
      break;
×
298
    }
299
  }
300

301
  *pBlockIdx += nBlocks;
×
302

303
  if (msgLen == 0) {
×
304
    // skip since no notification events found
305
    goto _end;
×
306
  }
307

308
  startTime = taosGetMonoTimestampMs();
×
309
  code = getStreamNotifyEventHeader(streamName, &msgHeader);
×
310
  TSDB_CHECK_CODE(code, lino, _end);
×
311
  msgHeaderLen = strlen(msgHeader);
×
312
  msgTailLen = strlen(msgTail);
×
313
  msgLen += msgHeaderLen;
×
314

315
  msg = taosMemoryMalloc(msgLen);
×
316
  TSDB_CHECK_NULL(msg, code, lino, _end, terrno);
×
317
  char* p = msg;
×
318
  TAOS_STRNCPY(p, msgHeader, msgHeaderLen);
×
319
  p += msgHeaderLen - msgTailLen;
×
320

321
  for (int32_t i = *pBlockIdx - nBlocks; i < *pBlockIdx; ++i) {
×
322
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
×
323
    if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
×
324
      continue;
×
325
    }
326

327
    SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
×
328
    for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
×
329
      char* val = colDataGetVarData(pEventStrCol, j);
×
330
      TAOS_STRNCPY(p, varDataVal(val), varDataLen(val));
×
331
      p += varDataLen(val);
×
332
      *(p++) = ',';
×
333
    }
334
  }
335

336
  p -= 1;
×
337
  TAOS_STRNCPY(p, msgTail, msgTailLen);
×
338
  *(p + msgTailLen) = '\0';
×
339

340
  *pMsg = msg;
×
341
  msg = NULL;
×
342

343
  endTime = taosGetMonoTimestampMs();
×
344
  pNotifyEventStat->notifyEventPackTimes++;
×
345
  pNotifyEventStat->notifyEventPackElems += *nNotifyEvents;
×
346
  pNotifyEventStat->notifyEventPackCostSec += (endTime - startTime) / 1000.0;
×
347

348
_end:
×
349
  if (code != TSDB_CODE_SUCCESS) {
×
350
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
351
  }
352
  if (msgHeader != NULL) {
×
353
    cJSON_free(msgHeader);
×
354
  }
355
  if (msg != NULL) {
×
356
    taosMemoryFreeClear(msg);
×
357
  }
358
  return code;
×
359
}
360

361
static int32_t sendSingleStreamNotify(SStreamNotifyHandle* pHandle, char* msg) {
×
362
#ifndef WINDOWS
363
  int32_t  code = TSDB_CODE_SUCCESS;
×
364
  int32_t  lino = 0;
×
365
  CURLcode res = CURLE_OK;
×
366
  uint64_t sentLen = 0;
×
367
  uint64_t totalLen = 0;
×
368
  size_t   nbytes = 0;
×
369

370
  TSDB_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
371
  TSDB_CHECK_NULL(pHandle->curl, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
372

373
  totalLen = strlen(msg);
×
374
  if (totalLen > 0) {
×
375
    // send PING frame to check if the connection is still alive
376
    res = curl_ws_send(pHandle->curl, "", 0, (size_t*)&sentLen, 0, CURLWS_PING);
×
377
    TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
378
  }
379
  sentLen = 0;
×
380
  while (sentLen < totalLen) {
×
381
    size_t chunkSize = TMIN(totalLen - sentLen, tsStreamNotifyFrameSize * 1024);
×
382
    if (sentLen == 0) {
×
383
      res = curl_ws_send(pHandle->curl, msg, chunkSize, &nbytes, totalLen, CURLWS_TEXT | CURLWS_OFFSET);
×
384
      TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
385
    } else {
386
      res = curl_ws_send(pHandle->curl, msg + sentLen, chunkSize, &nbytes, 0, CURLWS_TEXT | CURLWS_OFFSET);
×
387
      TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
×
388
    }
389
    sentLen += nbytes;
×
390
  }
391

392
_end:
×
393
  if (code != TSDB_CODE_SUCCESS) {
×
394
    tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code));
×
395
    stopStreamNotifyConn(pHandle);
×
396
  }
397
  return code;
×
398
#else
399
  tqError("stream notify events is not supported on windows");
400
  return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
401
#endif
402
}
403

404
int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode) {
×
405
  int32_t              code = TSDB_CODE_SUCCESS;
×
406
  int32_t              lino = 0;
×
407
  char*                msg = NULL;
×
408
  int32_t              nNotifyAddr = 0;
×
409
  int32_t              nNotifyEvents = 0;
×
410
  SStreamNotifyHandle* pHandle = NULL;
×
411
  int64_t              startTime = 0;
×
412
  int64_t              endTime = 0;
×
413
  int32_t              blockIdx = 0;
×
414

415
  TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
416
  TSDB_CHECK_NULL(pVnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
417

418
  nNotifyAddr = taosArrayGetSize(pTask->notifyInfo.pNotifyAddrUrls);
×
419
  if (nNotifyAddr == 0) {
×
420
    goto _end;
×
421
  }
422

423
  while (blockIdx < taosArrayGetSize(pBlocks)) {
×
424
    code = packupStreamNotifyEvent(pTask->notifyInfo.streamName, pBlocks, &msg, &nNotifyEvents, &pTask->notifyEventStat,
×
425
                                   &blockIdx);
426
    TSDB_CHECK_CODE(code, lino, _end);
×
427
    if (msg == NULL) {
×
428
      continue;
×
429
    }
430

431
    tqDebug("stream task %s prepare to send %d notify events, total msg length: %" PRIu64, pTask->notifyInfo.streamName,
×
432
            nNotifyEvents, (uint64_t)strlen(msg));
433

434
    startTime = taosGetMonoTimestampMs();
×
435
    for (int32_t i = 0; i < nNotifyAddr; ++i) {
×
436
      if (streamTaskShouldStop(pTask)) {
×
437
        break;
×
438
      }
439
      const char* url = taosArrayGetP(pTask->notifyInfo.pNotifyAddrUrls, i);
×
440
      code = acquireStreamNotifyHandle(pVnode->pNotifyHandleMap, url, &pHandle);
×
441
      if (code != TSDB_CODE_SUCCESS) {
×
442
        tqError("failed to get stream notify handle of %s", url);
×
443
        if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
×
444
          // retry for event message sending in PAUSE error handling mode
445
          taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS);
×
446
          --i;
×
447
          continue;
×
448
        } else {
449
          // simply ignore the failure in DROP error handling mode
450
          code = TSDB_CODE_SUCCESS;
×
451
          continue;
×
452
        }
453
      }
454
      code = sendSingleStreamNotify(pHandle, msg);
×
455
      if (code != TSDB_CODE_SUCCESS) {
×
456
        tqError("failed to send stream notify handle to %s since %s", url, tstrerror(code));
×
457
        if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
×
458
          // retry for event message sending in PAUSE error handling mode
459
          taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS);
×
460
          --i;
×
461
        } else {
462
          // simply ignore the failure in DROP error handling mode
463
          code = TSDB_CODE_SUCCESS;
×
464
        }
465
      } else {
466
        tqDebug("stream task %s send %d notify events to %s successfully", pTask->notifyInfo.streamName, nNotifyEvents,
×
467
                url);
468
      }
469
      releaseStreamNotifyHandle(&pHandle);
×
470
    }
471

472
    endTime = taosGetMonoTimestampMs();
×
473
    pTask->notifyEventStat.notifyEventSendTimes++;
×
474
    pTask->notifyEventStat.notifyEventSendElems += nNotifyEvents;
×
475
    pTask->notifyEventStat.notifyEventSendCostSec += (endTime - startTime) / 1000.0;
×
476

477
    taosMemoryFreeClear(msg);
×
478
  }
479

480
_end:
×
481
  if (code != TSDB_CODE_SUCCESS) {
×
482
    tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
483
  }
484
  if (msg) {
×
485
    taosMemoryFreeClear(msg);
×
486
  }
487
  return code;
×
488
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc