• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4935

22 Jan 2026 06:38AM UTC coverage: 66.708% (+0.02%) from 66.691%
#4935

push

travis-ci

web-flow
merge: from main to 3.0 #34371

121 of 271 new or added lines in 17 files covered. (44.65%)

9066 existing lines in 149 files now uncovered.

203884 of 305637 relevant lines covered (66.71%)

125811266.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.96
/utils/test/c/tmqSim.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <assert.h>
17
#include <math.h>
18
#include <stdio.h>
19
#include <stdlib.h>
20
#include <string.h>
21
#include <sys/stat.h>
22
#include <sys/types.h>
23
#include <time.h>
24

25
#include "taos.h"
26
#include "taosdef.h"
27
#include "taoserror.h"
28
#include "tlog.h"
29
#include "types.h"
30

31
#define GREEN     "\033[1;32m"
32
#define NC        "\033[0m"
33
#define min(a, b) (((a) < (b)) ? (a) : (b))
34

35
#define MAX_SQL_STR_LEN         (1024 * 1024)
36
#define MAX_ROW_STR_LEN         (16 * 1024)
37
#define MAX_CONSUMER_THREAD_CNT (16)
38
#define MAX_VGROUP_CNT          (32)
39
#define SEND_TIME_UNIT          10  // ms
40
#define MAX_SQL_LEN             1048576
41

42
typedef enum {
43
  NOTIFY_CMD_START_CONSUM,
44
  NOTIFY_CMD_START_COMMIT,
45
  NOTIFY_CMD_ID_BUTT,
46
} NOTIFY_CMD_ID;
47

48
typedef enum enumQUERY_TYPE { NO_INSERT_TYPE, INSERT_TYPE, QUERY_TYPE_BUT } QUERY_TYPE;
49

50
typedef struct {
51
  TdThread thread;
52
  int32_t  consumerId;
53

54
  int32_t ifManualCommit;
55
  // int32_t  autoCommitIntervalMs;  // 1000 ms
56
  // char     autoCommit[8];         // true, false
57
  // char     autoOffsetRest[16];    // none, earliest, latest
58

59
  TdFilePtr pConsumeRowsFile;
60
  TdFilePtr pConsumeMetaFile;
61
  int32_t   ifCheckData;
62
  int64_t   expectMsgCnt;
63

64
  int64_t consumeMsgCnt;
65
  int64_t consumeRowCnt;
66
  int64_t consumeLen;
67
  int32_t checkresult;
68

69
  char topicString[1024];
70
  char keyString[1024];
71

72
  int32_t numOfTopic;
73
  char    topics[32][64];
74

75
  int32_t numOfKey;
76
  char    key[32][64];
77
  char    value[32][64];
78

79
  tmq_t*      tmq;
80
  tmq_list_t* topicList;
81

82
  int32_t numOfVgroups;
83
  int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2];  // [i][0]: vgroup id, [i][1]: rows of consume
84
  int64_t ts;
85

86
  TAOS* taos;
87

88
  // below parameters is used by omb test
89
  int32_t producerRate;  // unit: msgs/s
90
  int64_t totalProduceMsgs;
91
  int64_t totalMsgsLen;
92

93
} SThreadInfo;
94

95
typedef struct {
96
  // input from argvs
97
  char        cdbName[32];
98
  char        dbName[64];
99
  int32_t     showMsgFlag;
100
  int32_t     showRowFlag;
101
  int32_t     saveRowFlag;
102
  int32_t     consumeDelay;  // unit s
103
  int32_t     numOfThread;
104
  int32_t     useSnapshot;
105
  int64_t     nowTime;
106
  SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
107

108
  SThreadInfo stProdThreads[MAX_CONSUMER_THREAD_CNT];
109

110
  // below parameters is used by omb test
111
  char    topic[64];
112
  int32_t producers;
113
  int32_t producerRate;
114
  int32_t runDurationMinutes;
115
  int32_t batchSize;
116
  int32_t payloadLen;
117
} SConfInfo;
118

119
static SConfInfo g_stConfInfo;
120
TdFilePtr        g_fp = NULL;
121
static int       running = 1;
122
char*            g_payload = NULL;
123

124
// char* g_pRowValue = NULL;
125
// TdFilePtr g_fp = NULL;
126

127
static void printHelp() {
×
128
  char indent[10] = "        ";
×
129
  printf("Used to test the tmq feature with sim cases\n");
×
130

131
  printf("%s%s\n", indent, "-c");
×
132
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
×
133
  printf("%s%s\n", indent, "-d");
×
134
  printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
×
135
  printf("%s%s\n", indent, "-g");
×
136
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
×
137
  printf("%s%s\n", indent, "-r");
×
138
  printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
×
139
  printf("%s%s\n", indent, "-s");
×
140
  printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag);
×
141
  printf("%s%s\n", indent, "-y");
×
142
  printf("%s%s%s%ds\n", indent, indent, "consume delay, default is ", g_stConfInfo.consumeDelay);
×
143
  printf("%s%s\n", indent, "-e");
×
144
  printf("%s%s%s%d\n", indent, indent, "snapshot, default is ", g_stConfInfo.useSnapshot);
×
145

146
  printf("%s%s\n", indent, "-t");
×
147
  printf("%s%s%s\n", indent, indent, "topic name, default is null");
×
148

149
  printf("%s%s\n", indent, "-x");
×
150
  printf("%s%s%s\n", indent, indent, "consume thread number, default is 1");
×
151

152
  printf("%s%s\n", indent, "-l");
×
153
  printf("%s%s%s%d\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes);
×
154
  printf("%s%s\n", indent, "-p");
×
155
  printf("%s%s%s\n", indent, indent, "producer thread number, default is 0");
×
156
  printf("%s%s\n", indent, "-b");
×
157
  printf("%s%s%s\n", indent, indent, "batch size, default is 1");
×
158
  printf("%s%s\n", indent, "-i");
×
159
  printf("%s%s%s\n", indent, indent, "produce rate unit is msgs /s, default is 100000");
×
160
  printf("%s%s\n", indent, "-n");
×
161
  printf("%s%s%s\n", indent, indent, "payload len unit is byte, default is 1000");
×
162

163
  exit(EXIT_SUCCESS);
×
164
}
165

166
char* getCurrentTimeString(char* timeString) {
14,412,632✔
167
  time_t    tTime = taosGetTimestampSec();
14,412,632✔
168
  struct tm tm;
14,292,823✔
169
  taosLocalTime(&tTime, &tm, NULL, 0, NULL);
14,412,632✔
170
  sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
14,412,317✔
171
          tm.tm_min, tm.tm_sec);
172

173
  return timeString;
14,412,317✔
174
}
175

176
static void tmqStop(int signum, void* info, void* ctx) {
×
177
  running = 0;
×
178
  char tmpString[128];
×
179
  taosFprintfFile(g_fp, "%s tmqStop() receive stop signal[%d]\n", getCurrentTimeString(tmpString), signum);
×
180
}
×
181

182
static void tmqSetSignalHandle() { 
67,091✔
183
  taosSetSignal(SIGINT, tmqStop); 
67,091✔
184
  taosSetSignal(SIGTERM, tmqStop); 
67,091✔
185
}
67,091✔
186

187
void initLogFile() {
67,091✔
188
  char filename[256];
63,087✔
189
  char tmpString[128];
63,087✔
190

191
  pid_t process_id = taosGetPId();
67,091✔
192

193
  if (0 != strlen(g_stConfInfo.topic)) {
67,091✔
UNCOV
194
    sprintf(filename, "/tmp/tmqlog-%d-%s.txt", process_id, getCurrentTimeString(tmpString));
×
195
  } else {
196
    sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
67,091✔
197
  }
198
#ifdef WINDOWS
199
  for (int i = 2; i < sizeof(filename); i++) {
200
    if (filename[i] == ':') filename[i] = '-';
201
    if (filename[i] == '\0') break;
202
  }
203
#endif
204
  TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
67,091✔
205
  if (NULL == pFile) {
67,091✔
UNCOV
206
    fprintf(stderr, "Failed to open %s for save result\n", filename);
×
UNCOV
207
    exit(-1);
×
208
  }
209
  g_fp = pFile;
67,091✔
210
}
67,091✔
211

212
void saveConfigToLogFile() {
67,091✔
213
  taosFprintfFile(g_fp, "###################################################################\n");
67,091✔
214
  taosFprintfFile(g_fp, "# configDir:           %s\n", configDir);
67,091✔
215
  taosFprintfFile(g_fp, "# dbName:              %s\n", g_stConfInfo.dbName);
67,091✔
216
  taosFprintfFile(g_fp, "# cdbName:             %s\n", g_stConfInfo.cdbName);
67,091✔
217
  taosFprintfFile(g_fp, "# showMsgFlag:         %d\n", g_stConfInfo.showMsgFlag);
67,091✔
218
  taosFprintfFile(g_fp, "# showRowFlag:         %d\n", g_stConfInfo.showRowFlag);
67,091✔
219
  taosFprintfFile(g_fp, "# saveRowFlag:         %d\n", g_stConfInfo.saveRowFlag);
67,091✔
220
  taosFprintfFile(g_fp, "# consumeDelay:        %d\n", g_stConfInfo.consumeDelay);
67,091✔
221
  taosFprintfFile(g_fp, "# numOfThread:         %d\n", g_stConfInfo.numOfThread);
67,091✔
222

223
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
140,335✔
224
    taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
73,244✔
225
    // taosFprintfFile(g_fp, "  auto commit:              %s\n", g_stConfInfo.stThreads[i].autoCommit);
226
    // taosFprintfFile(g_fp, "  auto commit interval ms:  %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
227
    // taosFprintfFile(g_fp, "  auto offset rest:         %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
228
    taosFprintfFile(g_fp, "  Topics: ");
73,244✔
229
    for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
151,230✔
230
      taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
77,986✔
231
    }
232
    taosFprintfFile(g_fp, "\n");
73,244✔
233
    taosFprintfFile(g_fp, "  Key: ");
73,244✔
234
    for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) {
366,220✔
235
      taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
292,976✔
236
    }
237
    taosFprintfFile(g_fp, "\n");
73,244✔
238
    taosFprintfFile(g_fp, "  expect rows: %" PRId64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt);
73,244✔
239
  }
240

241
  char tmpString[128];
63,087✔
242
  taosFprintfFile(g_fp, "# Test time:                %s\n", getCurrentTimeString(tmpString));
67,091✔
243
  taosFprintfFile(g_fp, "###################################################################\n");
67,091✔
244
}
67,091✔
245

246
void parseArgument(int32_t argc, char* argv[]) {
67,091✔
247
  memset(&g_stConfInfo, 0, sizeof(SConfInfo));
67,091✔
248
  g_stConfInfo.showMsgFlag = 0;
67,091✔
249
  g_stConfInfo.showRowFlag = 0;
67,091✔
250
  g_stConfInfo.saveRowFlag = 0;
67,091✔
251
  g_stConfInfo.consumeDelay = 5;
67,091✔
252
  g_stConfInfo.numOfThread = 1;
67,091✔
253
  g_stConfInfo.batchSize = 1;
67,091✔
254
  g_stConfInfo.producers = 0;
67,091✔
255

256
  g_stConfInfo.nowTime = taosGetTimestampMs();
67,091✔
257

258
  for (int32_t i = 1; i < argc; i++) {
520,314✔
259
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
453,223✔
UNCOV
260
      printHelp();
×
UNCOV
261
      exit(0);
×
262
    } else if (strcmp(argv[i], "-d") == 0) {
453,223✔
263
      tstrncpy(g_stConfInfo.dbName, argv[++i], sizeof(g_stConfInfo.dbName));
67,091✔
264
    } else if (strcmp(argv[i], "-w") == 0) {
386,132✔
265
      tstrncpy(g_stConfInfo.cdbName, argv[++i], sizeof(g_stConfInfo.cdbName));
67,091✔
266
    } else if (strcmp(argv[i], "-c") == 0) {
319,041✔
267
      tstrncpy(configDir, argv[++i], PATH_MAX);
67,091✔
268
    } else if (strcmp(argv[i], "-g") == 0) {
251,950✔
269
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
67,091✔
270
    } else if (strcmp(argv[i], "-r") == 0) {
184,859✔
271
      g_stConfInfo.showRowFlag = atol(argv[++i]);
67,091✔
272
    } else if (strcmp(argv[i], "-s") == 0) {
117,768✔
UNCOV
273
      g_stConfInfo.saveRowFlag = atol(argv[++i]);
×
274
    } else if (strcmp(argv[i], "-y") == 0) {
117,768✔
275
      g_stConfInfo.consumeDelay = atol(argv[++i]);
67,091✔
276
    } else if (strcmp(argv[i], "-e") == 0) {
50,677✔
277
      g_stConfInfo.useSnapshot = atol(argv[++i]);
50,677✔
278
    } else if (strcmp(argv[i], "-t") == 0) {
×
279
      char tmpBuf[56] = {0};
×
280
      tstrncpy(tmpBuf, argv[++i], sizeof(tmpBuf));
×
281
      sprintf(g_stConfInfo.topic, "`%s`", tmpBuf);
×
282
    } else if (strcmp(argv[i], "-x") == 0) {
×
283
      g_stConfInfo.numOfThread = atol(argv[++i]);
×
284
    } else if (strcmp(argv[i], "-l") == 0) {
×
285
      g_stConfInfo.runDurationMinutes = atol(argv[++i]);
×
286
    } else if (strcmp(argv[i], "-p") == 0) {
×
287
      g_stConfInfo.producers = atol(argv[++i]);
×
288
    } else if (strcmp(argv[i], "-b") == 0) {
×
289
      g_stConfInfo.batchSize = atol(argv[++i]);
×
290
    } else if (strcmp(argv[i], "-i") == 0) {
×
291
      g_stConfInfo.producerRate = atol(argv[++i]);
×
292
    } else if (strcmp(argv[i], "-n") == 0) {
×
293
      g_stConfInfo.payloadLen = atol(argv[++i]);
×
UNCOV
294
      if (g_stConfInfo.payloadLen <= 0 || g_stConfInfo.payloadLen > 1024 * 1024 * 1024) {
×
UNCOV
295
        pError("%s calloc size is too large: %s %s", GREEN, argv[++i], NC);
×
296
        exit(-1);
×
297
      }
298
    } else {
UNCOV
299
      pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
×
UNCOV
300
      exit(-1);
×
301
    }
302
  }
303

304
  g_payload = taosMemoryCalloc(g_stConfInfo.payloadLen + 1, 1);
67,091✔
305
  if (NULL == g_payload) {
67,091✔
UNCOV
306
    pPrint("%s failed to malloc for payload %s", GREEN, NC);
×
UNCOV
307
    exit(-1);
×
308
  }
309

310
  for (int32_t i = 0; i < g_stConfInfo.payloadLen; i++) {
67,091✔
UNCOV
311
    strcpy(&g_payload[i], "a");
×
312
  }
313

314
  initLogFile();
67,091✔
315

316
  taosFprintfFile(g_fp, "====parseArgument() success\n");
67,091✔
317

318
#if 1
319
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
67,091✔
320
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
67,091✔
321
  pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC);
67,091✔
322
  pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
67,091✔
323
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
67,091✔
324
  pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
67,091✔
325
  pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC);
67,091✔
326

327
  pPrint("%s snapshot:%d %s", GREEN, g_stConfInfo.useSnapshot, NC);
67,091✔
328

329
  pPrint("%s omb topic:%s %s", GREEN, g_stConfInfo.topic, NC);
67,091✔
330
  pPrint("%s numOfThread:%d %s", GREEN, g_stConfInfo.numOfThread, NC);
67,091✔
331
#endif
332
}
67,091✔
333

334
void splitStr(char** arr, char* str, const char* del) {
×
335
  char* s = strtok(str, del);
×
UNCOV
336
  while (s != NULL) {
×
337
    *arr++ = s;
×
UNCOV
338
    s = strtok(NULL, del);
×
339
  }
UNCOV
340
}
×
341

342
void ltrim(char* str) {
370,962✔
343
  if (str == NULL || *str == '\0') {
370,962✔
UNCOV
344
    return;
×
345
  }
346
  int   len = 0;
370,962✔
347
  char* p = str;
370,962✔
348
  while (*p != '\0' && isspace(*p)) {
3,037,690✔
349
    ++p;
2,666,728✔
350
    ++len;
2,666,728✔
351
  }
352
  memmove(str, p, strlen(str) - len + 1);
370,962✔
353
  // return str;
354
}
355

356
int queryDB(TAOS* taos, char* command) {
73,244✔
357
  int       retryCnt = 10;
73,244✔
358
  int       code = 0;
73,244✔
359
  TAOS_RES* pRes = NULL;
73,244✔
360

361
  while (retryCnt--) {
73,244✔
362
    pRes = taos_query(taos, command);
73,244✔
363
    code = taos_errno(pRes);
73,244✔
364
    if (code != 0) {
73,244✔
365
      taosSsleep(1);
×
366
      taosFprintfFile(g_fp, "queryDB continue, command:%s, code:%d\n", command, code);
×
UNCOV
367
      taos_free_result(pRes);
×
UNCOV
368
      pRes = NULL;
×
UNCOV
369
      continue;
×
370
    }
371
    taos_free_result(pRes);
73,244✔
372
    return 0;
73,244✔
373
  }
374

UNCOV
375
  pError("failed to reason:%s, sql: %s", tstrerror(code), command);
×
UNCOV
376
  taos_free_result(pRes);
×
UNCOV
377
  return -1;
×
378
}
379

380
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
11,060,010✔
381
  int32_t i;
382
  for (i = 0; i < pInfo->numOfVgroups; i++) {
22,054,833✔
383
    if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
21,888,253✔
384
      pInfo->rowsOfPerVgroups[i][1] += rows;
10,893,430✔
385
      return;
10,893,430✔
386
    }
387
  }
388

389
  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
166,580✔
390
  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
166,580✔
391
  pInfo->numOfVgroups++;
166,580✔
392

393
  taosFprintfFile(g_fp, "consume id %d, add new vgroupId:%d\n", pInfo->consumerId, vgroupId);
166,580✔
394
  if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
166,580✔
395
    taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
×
396
                    pInfo->numOfVgroups, vgroupId);
UNCOV
397
    taosCloseFile(&g_fp);
×
UNCOV
398
    exit(-1);
×
399
  }
400
}
401

402
TAOS* createNewTaosConnect() {
140,335✔
403
  TAOS*   taos = NULL;
140,335✔
404
  int32_t retryCnt = 10;
140,335✔
405

406
  while (retryCnt--) {
140,335✔
407
    taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
140,335✔
408
    if (NULL != taos) {
140,335✔
409
      return taos;
140,335✔
410
    }
411
    taosSsleep(1);
×
412
  }
413

UNCOV
414
  taosFprintfFile(g_fp, "taos_connect() fail\n");
×
415
  return NULL;
×
416
}
417

418
int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
×
419
  char sqlStr[1100] = {0};
×
420

421
  if (strlen(buf) > 1024) {
×
UNCOV
422
    taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", (int)strlen(buf));
×
UNCOV
423
    taosCloseFile(&g_fp);
×
424
    return -1;
×
425
  }
426

427
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
×
UNCOV
428
  if (pConn == NULL) {
×
UNCOV
429
    taosFprintfFile(g_fp, "taos_connect() fail, can not save consume result to main script\n");
×
430
    return -1;
×
431
  }
432

433
  sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
×
434
          pInfo->ts++, buf);
×
435
  int retCode = queryDB(pConn, sqlStr);
×
436
  if (retCode != 0) {
×
437
    taosFprintfFile(g_fp, "error in save consume content\n");
×
UNCOV
438
    taosCloseFile(&g_fp);
×
UNCOV
439
    taos_close(pConn);
×
440
    exit(-1);
×
441
  }
442

UNCOV
443
  taos_close(pConn);
×
444

UNCOV
445
  return 0;
×
446
}
447

448
static char* shellFormatTimestamp(char* buf, int32_t bufSize, int64_t val, int32_t precision) {
2,147,483,647✔
449
  // if (shell.args.is_raw_time) {
450
  //   sprintf(buf, "%" PRId64, val);
451
  //   return buf;
452
  // }
453

454
  time_t  tt;
2,147,483,647✔
455
  int32_t ms = 0;
2,147,483,647✔
456
  if (precision == TSDB_TIME_PRECISION_NANO) {
2,147,483,647✔
457
    tt = (time_t)(val / 1000000000);
13,750,000✔
458
    ms = val % 1000000000;
13,750,000✔
459
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
2,147,483,647✔
460
    tt = (time_t)(val / 1000000);
13,750,000✔
461
    ms = val % 1000000;
13,750,000✔
462
  } else {
463
    tt = (time_t)(val / 1000);
2,147,483,647✔
464
    ms = val % 1000;
2,147,483,647✔
465
  }
466

467
  if (tt <= 0 && ms < 0) {
2,147,483,647✔
468
    tt--;
×
469
    if (precision == TSDB_TIME_PRECISION_NANO) {
×
UNCOV
470
      ms += 1000000000;
×
471
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
UNCOV
472
      ms += 1000000;
×
473
    } else {
UNCOV
474
      ms += 1000;
×
475
    }
476
  }
477

478
  struct tm ptm;
2,147,483,647✔
479
  if (taosLocalTime(&tt, &ptm, buf, bufSize, NULL) == NULL) {
2,147,483,647✔
UNCOV
480
    return buf;
×
481
  }
482
  size_t pos = taosStrfTime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
2,147,483,647✔
483

484
  if (precision == TSDB_TIME_PRECISION_NANO) {
2,147,483,647✔
485
    sprintf(buf + pos, ".%09d", ms);
13,778,724✔
486
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
2,147,483,647✔
487
    sprintf(buf + pos, ".%06d", ms);
13,750,000✔
488
  } else {
489
    sprintf(buf + pos, ".%03d", ms);
2,147,483,647✔
490
  }
491

492
  return buf;
2,147,483,647✔
493
}
494

495
static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* field, int32_t length,
2,147,483,647✔
496
                                 int32_t precision) {
497
  if (val == NULL) {
2,147,483,647✔
498
    taosFprintfFile(pFile, "NULL");
129,123,513✔
499
    return;
129,123,513✔
500
  }
501

502
  char quotationStr[2];
2,147,483,647✔
503
  quotationStr[0] = '\"';
2,147,483,647✔
504
  quotationStr[1] = 0;
2,147,483,647✔
505

506
  int  n;
507
  char buf[TSDB_MAX_BYTES_PER_ROW];
2,147,483,647✔
508
  switch (field->type) {
2,147,483,647✔
509
    case TSDB_DATA_TYPE_BOOL:
×
510
      taosFprintfFile(pFile, "%d", ((((int32_t)(*((char*)val))) == 1) ? 1 : 0));
×
511
      break;
×
512
    case TSDB_DATA_TYPE_TINYINT:
×
513
      taosFprintfFile(pFile, "%d", *((int8_t*)val));
×
514
      break;
×
515
    case TSDB_DATA_TYPE_UTINYINT:
×
516
      taosFprintfFile(pFile, "%u", *((uint8_t*)val));
×
517
      break;
×
518
    case TSDB_DATA_TYPE_SMALLINT:
×
519
      taosFprintfFile(pFile, "%d", *((int16_t*)val));
×
520
      break;
×
UNCOV
521
    case TSDB_DATA_TYPE_USMALLINT:
×
UNCOV
522
      taosFprintfFile(pFile, "%u", *((uint16_t*)val));
×
UNCOV
523
      break;
×
524
    case TSDB_DATA_TYPE_INT:
2,147,483,647✔
525
      taosFprintfFile(pFile, "%d", *((int32_t*)val));
2,147,483,647✔
526
      break;
2,147,483,647✔
UNCOV
527
    case TSDB_DATA_TYPE_UINT:
×
UNCOV
528
      taosFprintfFile(pFile, "%u", *((uint32_t*)val));
×
UNCOV
529
      break;
×
530
    case TSDB_DATA_TYPE_BIGINT:
2,147,483,647✔
531
      taosFprintfFile(pFile, "%" PRId64, *((int64_t*)val));
2,147,483,647✔
532
      break;
2,147,483,647✔
UNCOV
533
    case TSDB_DATA_TYPE_UBIGINT:
×
UNCOV
534
      taosFprintfFile(pFile, "%" PRIu64, *((uint64_t*)val));
×
UNCOV
535
      break;
×
536
    case TSDB_DATA_TYPE_FLOAT:
76,765✔
537
      taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
76,765✔
538
      break;
76,765✔
539
    case TSDB_DATA_TYPE_DOUBLE:
2,147,483,647✔
540
      n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val));
2,147,483,647✔
541
      if (n > TMAX(25, length)) {
2,147,483,647✔
542
        taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val));
83,994,660✔
543
      } else {
544
        taosFprintfFile(pFile, "%s", buf);
2,147,483,647✔
545
      }
546
      break;
2,147,483,647✔
547
    case TSDB_DATA_TYPE_BINARY:
2,147,483,647✔
548
    case TSDB_DATA_TYPE_VARBINARY:
549
    case TSDB_DATA_TYPE_NCHAR:
550
    case TSDB_DATA_TYPE_JSON:
551
    case TSDB_DATA_TYPE_GEOMETRY: {
552
      int32_t bufIndex = 0;
2,147,483,647✔
553
      for (int32_t i = 0; i < length; i++) {
2,147,483,647✔
554
        buf[bufIndex] = val[i];
2,147,483,647✔
555
        bufIndex++;
2,147,483,647✔
556
        if (val[i] == '\"') {
2,147,483,647✔
UNCOV
557
          buf[bufIndex] = val[i];
×
UNCOV
558
          bufIndex++;
×
559
        }
560
      }
561
      buf[bufIndex] = 0;
2,147,483,647✔
562

563
      taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
2,147,483,647✔
564
    } break;
2,147,483,647✔
565
    case TSDB_DATA_TYPE_TIMESTAMP:
2,147,483,647✔
566
      shellFormatTimestamp(buf, sizeof(buf), *(int64_t*)val, precision);
2,147,483,647✔
567
      taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
2,147,483,647✔
568
      break;
2,147,483,647✔
UNCOV
569
    default:
×
UNCOV
570
      break;
×
571
  }
572
}
573

574
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields,
2,147,483,647✔
575
                               int32_t precision) {
576
  for (int32_t i = 0; i < num_fields; i++) {
2,147,483,647✔
577
    if (i > 0) {
2,147,483,647✔
578
      taosFprintfFile(pFile, ",");
2,147,483,647✔
579
    }
580
    shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision);
2,147,483,647✔
581
  }
582
  taosFprintfFile(pFile, "\n");
2,147,483,647✔
583
}
2,147,483,647✔
584

585
static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
11,060,010✔
586
  char    buf[1024];
10,972,292✔
587
  int32_t totalRows = 0;
11,060,010✔
588

589
  int32_t     vgroupId = tmq_get_vgroup_id(msg);
11,060,010✔
590
  const char* dbName = tmq_get_db_name(msg);
11,060,010✔
591

592
  char timestring[128] = {0};
11,060,010✔
593
  taosFprintfFile(g_fp, "%s consumerId: %d, msg index:%d\n", getCurrentTimeString(timestring), pInfo->consumerId, msgIndex);
11,060,010✔
594
  int32_t index = 0;
11,060,010✔
595
  for (index = 0; index < pInfo->numOfVgroups; index++) {
22,054,833✔
596
    if (vgroupId == pInfo->rowsOfPerVgroups[index][0]) {
21,888,253✔
597
      break;
10,893,430✔
598
    }
599
  }
600

601
  taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId:%d, currentRows:%d\n", dbName != NULL ? dbName : "invalid table",
11,060,010✔
602
                  tmq_get_topic_name(msg), vgroupId, pInfo->rowsOfPerVgroups[index][1]);
10,972,292✔
603

604
  while (1) {
2,147,483,647✔
605
    TAOS_ROW row = taos_fetch_row(msg);
2,147,483,647✔
606
    if (row == NULL) {
2,147,483,647✔
607
      break;
11,060,010✔
608
    }
609

610
    TAOS_FIELD* fields = taos_fetch_fields(msg);
2,147,483,647✔
611
    int32_t     numOfFields = taos_field_count(msg);
2,147,483,647✔
612
    int32_t*    length = taos_fetch_lengths(msg);
2,147,483,647✔
613
    int32_t     precision = taos_result_precision(msg);
2,147,483,647✔
614
    const char* tbName = tmq_get_table_name(msg);
2,147,483,647✔
615

616
#if 0
617
        // get schema
618
        //============================== stub =================================================//
619
        for (int32_t i = 0; i < numOfFields; i++) {
620
          taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
621
        }
622
        //============================== stub =================================================//
623
#endif
624

625
    dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
2,147,483,647✔
626
    taos_print_row(buf, row, fields, numOfFields);
2,147,483,647✔
627

628
    if (0 != g_stConfInfo.showRowFlag) {
2,147,483,647✔
629
      taosFprintfFile(g_fp, "time:%" PRId64 " tbname:%s, rows[%d]: %s\n", taosGetTimestampMs(), (tbName != NULL ? tbName : "null table"), totalRows, buf);
2,147,483,647✔
630
      // if (0 != g_stConfInfo.saveRowFlag) {
631
      //   saveConsumeContentToTbl(pInfo, buf);
632
      // }
633
//      taosFsyncFile(g_fp);
634
    }
635

636
    totalRows++;
2,147,483,647✔
637
  }
638

639
  addRowsToVgroupId(pInfo, vgroupId, totalRows);
11,060,010✔
640
  return totalRows;
11,060,010✔
641
}
642

UNCOV
643
static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
×
UNCOV
644
  char    buf[1024];
×
645
  int32_t totalRows = 0;
×
646

647
  // printf("topic: %s\n", tmq_get_topic_name(msg));
648
  int32_t     vgroupId = tmq_get_vgroup_id(msg);
×
649
  const char* dbName = tmq_get_db_name(msg);
×
650

UNCOV
651
  taosFprintfFile(g_fp, "%s consumerId: %d, msg index:%d\n", getCurrentTimeString(buf), pInfo->consumerId, msgIndex);
×
UNCOV
652
  taosFprintfFile(g_fp, "%s dbName: %s, topic: %s, vgroupId: %d\n", getCurrentTimeString(buf),dbName != NULL ? dbName : "invalid table",
×
653
                  tmq_get_topic_name(msg), vgroupId);
654

655
  {
UNCOV
656
    tmq_raw_data raw = {0};
×
UNCOV
657
    int32_t      code = tmq_get_raw(msg, &raw);
×
658

659
    if (code == TSDB_CODE_SUCCESS) {
660
      //          int retCode = queryDB(pInfo->taos, "use metadb");
661
      //          if (retCode != 0) {
662
      //                taosFprintfFile(g_fp, "error when use metadb\n");
663
      //                taosCloseFile(&g_fp);
664
      //                exit(-1);
665
      //          }
666
      //          taosFprintfFile(g_fp, "raw:%p\n", &raw);
667
      //
668
      //      tmq_write_raw(pInfo->taos, raw);
669
    }
670

671
    char* result = tmq_get_json_meta(msg);
×
UNCOV
672
    if (result && strcmp(result, "") != 0) {
×
673
      // printf("meta result: %s\n", result);
UNCOV
674
      taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result);
×
675
    }
676
    tmq_free_json_meta(result);
×
677
  }
678

UNCOV
679
  totalRows++;
×
680

UNCOV
681
  return totalRows;
×
682
}
683

684
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
125,319✔
685

686
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
125,319✔
687
  char sqlStr[1024] = {0};
125,319✔
688

689
  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
690
  sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName,
125,319✔
691
          atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId, pInfo->consumerId);
692

693
  taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
125,319✔
694

695
  taosFprintfFile(g_fp, "notifyMainScript success, sql: %s\n", sqlStr);
125,319✔
696

697
  return 0;
125,319✔
698
}
699

700
static int32_t g_once_commit_flag = 0;
701

702
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
2,998,708✔
703
  taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
2,998,708✔
704

705
  if (0 == g_once_commit_flag && code == 0) {
2,998,708✔
706
    g_once_commit_flag = 1;
57,277✔
707
    notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
57,277✔
708
  }
709

710
  char tmpString[128];
2,991,320✔
711
  taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
2,998,708✔
712
}
2,998,708✔
713

714
void build_consumer(SThreadInfo* pInfo) {
73,244✔
715
  tmq_conf_t* conf = tmq_conf_new();
73,244✔
716

717
  // tmq_conf_set(conf, "td.connect.ip", "localhost");
718
  // tmq_conf_set(conf, "td.connect.port", "6030");
719
  tmq_conf_set(conf, "td.connect.user", "root");
73,244✔
720
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
73,191✔
721

722
  // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
723

724
  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
73,191✔
725

726
  // tmq_conf_set(conf, "group.id", "cgrp1");
727
  for (int32_t i = 0; i < pInfo->numOfKey; i++) {
366,220✔
728
    tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
292,976✔
729
  }
730

731
  tmq_conf_set(conf, "msg.with.table.name", "true");
73,244✔
732

733
  // tmq_conf_set(conf, "client.id", "c-001");
734

735
  // tmq_conf_set(conf, "enable.auto.commit", "true");
736
  // tmq_conf_set(conf, "enable.auto.commit", "false");
737

738
  // tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
739

740
  // tmq_conf_set(conf, "auto.offset.reset", "none");
741
  // tmq_conf_set(conf, "auto.offset.reset", "earliest");
742
  // tmq_conf_set(conf, "auto.offset.reset", "latest");
743
  //
744
  if (g_stConfInfo.useSnapshot) {
73,244✔
745
    tmq_conf_set(conf, "experimental.snapshot.enable", "true");
24,329✔
746
  }
747

748
  pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
73,244✔
749
  tmq_conf_destroy(conf);
73,244✔
750
  return;
73,244✔
751
}
752

753
void build_topic_list(SThreadInfo* pInfo) {
73,244✔
754
  pInfo->topicList = tmq_list_new();
73,244✔
755
  // tmq_list_append(topic_list, "test_stb_topic_1");
756
  for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
151,230✔
757
    tmq_list_append(pInfo->topicList, pInfo->topics[i]);
77,986✔
758
  }
759
  return;
73,244✔
760
}
761

762
int32_t saveConsumeResult(SThreadInfo* pInfo) {
73,244✔
763
  char sqlStr[1024] = {0};
73,244✔
764
  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
765
  sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
73,244✔
766
          g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), pInfo->consumerId, pInfo->consumeMsgCnt,
767
          pInfo->consumeRowCnt, pInfo->checkresult);
768

769
  char tmpString[128];
67,679✔
770
  taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
73,244✔
771

772
  int retCode = queryDB(pInfo->taos, sqlStr);
73,244✔
773
  if (retCode != 0) {
73,244✔
UNCOV
774
    taosFprintfFile(g_fp, "consume id %d error in save consume result\n", pInfo->consumerId);
×
UNCOV
775
    return -1;
×
776
  }
777

778
  return 0;
73,244✔
779
}
780

781
void loop_consume(SThreadInfo* pInfo) {
73,244✔
782
  int32_t code;
783

784
  int32_t once_flag = 0;
73,244✔
785

786
  int64_t totalMsgs = 0;
73,244✔
787
  int64_t totalRows = 0;
73,244✔
788

789
  char tmpString[128];
67,679✔
790
  taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
73,244✔
791
                  pInfo->consumerId);
792

793
  pInfo->ts = taosGetTimestampMs();
73,244✔
794

795
  if (pInfo->ifCheckData) {
73,244✔
796
    char filename[256] = {0};
45,233✔
797
    memset(tmpString, 0, tListLen(tmpString));
45,233✔
798

799
    // sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId,
800
    // getCurrentTimeString(tmpString));
801
    sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
45,233✔
802
    pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
45,233✔
803

804
    sprintf(filename, "%s/../log/meta_consumerid_%d.txt", configDir, pInfo->consumerId);
45,233✔
805
    pInfo->pConsumeMetaFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
45,233✔
806

807
    if (pInfo->pConsumeRowsFile == NULL || pInfo->pConsumeMetaFile == NULL) {
45,233✔
UNCOV
808
      taosFprintfFile(g_fp, "%s create file fail for save rows or save meta\n", getCurrentTimeString(tmpString));
×
UNCOV
809
      return;
×
810
    }
811
  }
812

813
  int64_t  lastTotalMsgs = 0;
73,244✔
814
  uint64_t lastPrintTime = taosGetTimestampMs();
73,244✔
815
  uint64_t startTs = taosGetTimestampMs();
73,244✔
816

817
  int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
73,244✔
818
  while (running) {
11,094,173✔
819
    TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
11,094,173✔
820
    if (tmqMsg) {
11,094,173✔
821
      if (0 != g_stConfInfo.showMsgFlag) {
11,060,010✔
822
        tmq_res_t msgType = tmq_get_res_type(tmqMsg);
11,060,010✔
823
        if (msgType == TMQ_RES_TABLE_META) {
11,060,010✔
824
          totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs);
×
825
        } else if (msgType == TMQ_RES_DATA) {
11,060,010✔
826
          totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
11,060,010✔
UNCOV
827
        } else if (msgType == TMQ_RES_METADATA) {
×
UNCOV
828
          meta_msg_process(tmqMsg, pInfo, totalMsgs);
×
UNCOV
829
          totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
×
830
        }
831
      }
832

833
      taos_free_result(tmqMsg);
11,060,010✔
834
      totalMsgs++;
11,060,010✔
835

836
      int64_t currentPrintTime = taosGetTimestampMs();
11,060,010✔
837
      if (currentPrintTime - lastPrintTime > 10 * 1000) {
11,060,010✔
838
        taosFprintfFile(
38,837✔
839
            g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n",
840
            pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / (currentPrintTime - lastPrintTime));
38,837✔
841
        lastPrintTime = currentPrintTime;
38,837✔
842
        lastTotalMsgs = totalMsgs;
38,837✔
843
      }
844

845
      if (0 == once_flag) {
11,060,010✔
846
        once_flag = 1;
68,042✔
847
        notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
68,042✔
848
      }
849

850
      if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) {
11,060,010✔
851
        memset(tmpString, 0, tListLen(tmpString));
39,081✔
852
        taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
39,081✔
853
        break;
39,081✔
854
      }
855
    } else {
856
      memset(tmpString, 0, tListLen(tmpString));
34,163✔
857
      taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
34,163✔
858
      break;
34,163✔
859
    }
860
  }
861

862
  if (0 == running) {
73,244✔
UNCOV
863
    taosFprintfFile(g_fp, "receive stop signal and not continue consume\n");
×
864
  }
865

866
  pInfo->consumeMsgCnt = totalMsgs;
73,244✔
867
  pInfo->consumeRowCnt = totalRows;
73,244✔
868

869
  taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
73,244✔
870
                  pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
871

872
  if(taosFsyncFile(pInfo->pConsumeRowsFile) < 0){
73,244✔
UNCOV
873
    printf("taosFsyncFile error:%s", strerror(errno));
×
874
  }
875
  taosCloseFile(&pInfo->pConsumeRowsFile);
73,244✔
876
}
877

878
void* consumeThreadFunc(void* param) {
73,244✔
879
  SThreadInfo* pInfo = (SThreadInfo*)param;
73,244✔
880

881
  pInfo->taos = createNewTaosConnect();
73,244✔
882
  if (pInfo->taos == NULL) {
73,244✔
UNCOV
883
    taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
×
UNCOV
884
    return NULL;
×
885
  }
886

887
  build_consumer(pInfo);
73,244✔
888
  build_topic_list(pInfo);
73,244✔
889
  if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
73,244✔
890
    taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
×
UNCOV
891
    taos_close(pInfo->taos);
×
UNCOV
892
    pInfo->taos = NULL;
×
UNCOV
893
    return NULL;
×
894
  }
895

896
  int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
73,244✔
897
  if (err != 0) {
73,244✔
898
    pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
×
899
    taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
×
UNCOV
900
    taos_close(pInfo->taos);
×
UNCOV
901
    pInfo->taos = NULL;
×
UNCOV
902
    return NULL;
×
903
  }
904

905
  tmq_list_destroy(pInfo->topicList);
73,244✔
906
  pInfo->topicList = NULL;
73,244✔
907

908
  loop_consume(pInfo);
73,244✔
909

910
  if (pInfo->ifManualCommit) {
73,244✔
911
    pPrint("tmq_commit() manual commit when consume end.\n");
59,953✔
912
    /*tmq_commit(pInfo->tmq, NULL, 0);*/
913
    tmq_commit_sync(pInfo->tmq, NULL);
59,953✔
914
    tmq_commit_cb_print(pInfo->tmq, 0, pInfo);
59,953✔
915
    taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n");
59,953✔
916
    pPrint("tmq_commit() manual commit over.\n");
59,953✔
917
  }
918

919
  err = tmq_unsubscribe(pInfo->tmq);
73,244✔
920
  if (err != 0) {
73,244✔
UNCOV
921
    pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
×
UNCOV
922
    taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
×
923
  }
924

925
  err = tmq_consumer_close(pInfo->tmq);
73,244✔
926
  if (err != 0) {
73,244✔
UNCOV
927
    pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
×
UNCOV
928
    taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
×
929
  }
930
  pInfo->tmq = NULL;
73,244✔
931

932
  // save consume result into consumeresult table
933
  saveConsumeResult(pInfo);
73,244✔
934

935
  // save rows from per vgroup
936
  taosFprintfFile(g_fp, "======== consumerId: %d, consume rows from per vgroups ========\n", pInfo->consumerId);
73,244✔
937
  for (int32_t i = 0; i < pInfo->numOfVgroups; i++) {
239,824✔
938
    taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]);
166,580✔
939
  }
940

941
  taos_close(pInfo->taos);
73,244✔
942
  pInfo->taos = NULL;
73,244✔
943

944
  return NULL;
73,244✔
945
}
946

947
void parseConsumeInfo() {
67,091✔
948
  char*      token;
949
  const char delim[2] = ",";
67,091✔
950
  const char ch = ':';
67,091✔
951

952
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
140,335✔
953
    token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
73,244✔
954
    while (token != NULL) {
151,230✔
955
      // printf("%s\n", token );
956
      tstrncpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token,
77,986✔
957
               sizeof(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]));
958
      ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
77,986✔
959
      // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
960
      g_stConfInfo.stThreads[i].numOfTopic++;
77,986✔
961

962
      token = strtok(NULL, delim);
77,986✔
963
    }
964

965
    token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
73,244✔
966
    while (token != NULL) {
366,220✔
967
      // printf("%s\n", token );
968
      {
969
        char* pstr = token;
292,976✔
970
        ltrim(pstr);
292,976✔
971
        char* ret = strchr(pstr, ch);
292,976✔
972
        memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
292,976✔
973
        tstrncpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1,
292,976✔
974
                 sizeof(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey]));
975
        // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
976
        // g_stConfInfo.value[g_stConfInfo.numOfKey]);
977
        g_stConfInfo.stThreads[i].numOfKey++;
292,976✔
978
      }
979

980
      token = strtok(NULL, delim);
292,976✔
981
    }
982
  }
983
}
67,091✔
984

985
int32_t getConsumeInfo() {
67,091✔
986
  char sqlStr[1024] = {0};
67,091✔
987

988
  TAOS* pConn = createNewTaosConnect();
67,091✔
989
  if (pConn == NULL) {
67,091✔
UNCOV
990
    taosFprintfFile(g_fp, "taos_connect() fail, can not get consume info for start consumer\n");
×
UNCOV
991
    return -1;
×
992
  }
993

994
  sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
67,091✔
995
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
67,091✔
996
  if (taos_errno(pRes) != 0) {
67,091✔
997
    taosFprintfFile(g_fp, "error in get consumeinfo for %s\n", taos_errstr(pRes));
×
998
    taosCloseFile(&g_fp);
×
UNCOV
999
    taos_free_result(pRes);
×
UNCOV
1000
    taos_close(pConn);
×
UNCOV
1001
    return -1;
×
1002
  }
1003

1004
  TAOS_ROW    row = NULL;
67,091✔
1005
  int         num_fields = taos_num_fields(pRes);
67,091✔
1006
  TAOS_FIELD* fields = taos_fetch_fields(pRes);
67,091✔
1007

1008
  // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
1009
  // ifcheckdata int
1010

1011
  int32_t numOfThread = 0;
67,091✔
1012
  while ((row = taos_fetch_row(pRes))) {
140,335✔
1013
    int32_t* lengths = taos_fetch_lengths(pRes);
73,244✔
1014

1015
    // set default value
1016
    // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
1017
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
1018
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
1019

1020
    for (int i = 0; i < num_fields; ++i) {
584,947✔
1021
      if (row[i] == NULL || 0 == i) {
511,703✔
1022
        continue;
73,244✔
1023
      }
1024

1025
      if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
438,459✔
1026
        g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
73,244✔
1027
      } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
365,215✔
1028
        memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
73,244✔
1029
      } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
291,971✔
1030
        memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
73,244✔
1031
      } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
218,727✔
1032
        g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
73,244✔
1033
      } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
145,483✔
1034
        g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
73,244✔
1035
      } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
72,239✔
1036
        g_stConfInfo.stThreads[numOfThread].ifManualCommit = *((int32_t*)row[i]);
72,239✔
1037
      }
1038
    }
1039
    numOfThread++;
73,244✔
1040
  }
1041
  g_stConfInfo.numOfThread = numOfThread;
67,091✔
1042

1043
  taos_free_result(pRes);
67,091✔
1044

1045
  parseConsumeInfo();
67,091✔
1046
  taos_close(pConn);
67,091✔
1047

1048
  return 0;
67,091✔
1049
}
1050

1051
static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex, int64_t* lenOfRows) {
×
UNCOV
1052
  char    buf[16 * 1024];
×
UNCOV
1053
  int32_t totalRows = 0;
×
UNCOV
1054
  int32_t totalLen = 0;
×
1055

1056
  // printf("topic: %s\n", tmq_get_topic_name(msg));
1057
  // int32_t     vgroupId = tmq_get_vgroup_id(msg);
1058
  // const char* dbName = tmq_get_db_name(msg);
1059

1060
  // taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
1061
  // taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
1062
  //                 tmq_get_topic_name(msg), vgroupId);
1063

1064
  while (1) {
×
UNCOV
1065
    TAOS_ROW row = taos_fetch_row(msg);
×
1066

1067
    if (row == NULL) break;
×
1068

UNCOV
1069
    TAOS_FIELD* fields = taos_fetch_fields(msg);
×
UNCOV
1070
    int32_t     numOfFields = taos_field_count(msg);
×
1071
    // int32_t*    length = taos_fetch_lengths(msg);
1072
    // int32_t     precision = taos_result_precision(msg);
1073
    // const char* tbName = tmq_get_table_name(msg);
1074

UNCOV
1075
    taos_print_row(buf, row, fields, numOfFields);
×
UNCOV
1076
    totalLen += strlen(buf);
×
1077
    totalRows++;
×
1078
  }
1079

UNCOV
1080
  *lenOfRows = totalLen;
×
1081
  return totalRows;
×
1082
}
1083

1084
void omb_loop_consume(SThreadInfo* pInfo) {
×
1085
  int32_t code;
1086

1087
  int32_t once_flag = 0;
×
1088

1089
  int64_t totalMsgs = 0;
×
1090
  int64_t totalRows = 0;
×
1091

1092
  char tmpString[128];
×
UNCOV
1093
  taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
×
1094
                  pInfo->consumerId);
UNCOV
1095
  printf("%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
×
1096

1097
  pInfo->ts = taosGetTimestampMs();
×
1098

UNCOV
1099
  int64_t  lastTotalMsgs = 0;
×
1100
  uint64_t lastPrintTime = taosGetTimestampMs();
×
1101
  uint64_t startTs = taosGetTimestampMs();
×
1102

1103
  int64_t totalLenOfMsg = 0;
×
1104
  int64_t lastTotalLenOfMsg = 0;
×
1105
  int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
×
1106
  while (running) {
×
1107
    TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
×
1108
    if (tmqMsg) {
×
1109
      int64_t lenOfMsg = 0;
×
1110
      totalRows += omb_data_msg_process(tmqMsg, pInfo, totalMsgs, &lenOfMsg);
×
1111
      totalLenOfMsg += lenOfMsg;
×
1112
      taos_free_result(tmqMsg);
×
1113
      totalMsgs++;
×
1114
      int64_t currentPrintTime = taosGetTimestampMs();
×
1115
      if (currentPrintTime - lastPrintTime > 10 * 1000) {
×
UNCOV
1116
        int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
×
1117
        int64_t deltaTime = currentPrintTime - lastPrintTime;
×
1118
        printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64
×
1119
               ", rate: %.3f msgs/s, %.1f MB/s\n",
1120
               pInfo->consumerId, totalRows, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
×
UNCOV
1121
               currentLenOfMsg * 1000.0 / (1024 * 1024) / deltaTime);
×
1122

1123
        taosFprintfFile(g_fp,
×
1124
                        "consumer id %d has currently poll total msgs: %" PRId64
1125
                        ", period cons rate: %.3f msgs/s, %.1f MB/s\n",
1126
                        pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
×
1127
                        currentLenOfMsg * 1000.0 / deltaTime);
×
UNCOV
1128
        lastPrintTime = currentPrintTime;
×
UNCOV
1129
        lastTotalMsgs = totalMsgs;
×
1130
        lastTotalLenOfMsg = totalLenOfMsg;
×
1131
      }
1132
    } else {
1133
      memset(tmpString, 0, tListLen(tmpString));
×
1134
      taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
×
1135
      printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
×
1136
      int64_t currentPrintTime = taosGetTimestampMs();
×
UNCOV
1137
      int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
×
1138
      int64_t deltaTime = currentPrintTime - lastPrintTime;
×
1139
      printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64
×
1140
             ", rate: %.3f msgs/s, %.1f MB/s\n",
UNCOV
1141
             pInfo->consumerId, totalRows, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
×
UNCOV
1142
             currentLenOfMsg * 1000.0 / (1024 * 1024) / deltaTime);
×
UNCOV
1143
      break;
×
1144
    }
1145
  }
1146

1147
  pInfo->consumeMsgCnt = totalMsgs;
×
UNCOV
1148
  pInfo->consumeRowCnt = totalRows;
×
1149
  pInfo->consumeLen = totalLenOfMsg;
×
1150
}
×
1151

UNCOV
1152
void* ombConsumeThreadFunc(void* param) {
×
1153
  SThreadInfo* pInfo = (SThreadInfo*)param;
×
1154

1155
  //################### set key ########################
1156
  tmq_conf_t* conf = tmq_conf_new();
×
1157
  // tmq_conf_set(conf, "td.connect.ip", "localhost");
1158
  // tmq_conf_set(conf, "td.connect.port", "6030");
1159
  tmq_conf_set(conf, "td.connect.user", "root");
×
1160
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
×
1161
  // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
UNCOV
1162
  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
×
UNCOV
1163
  tmq_conf_set(conf, "group.id", "ombCgrp");
×
1164
  // tmq_conf_set(conf, "msg.with.table.name", "true");
1165
  // tmq_conf_set(conf, "client.id", "c-001");
1166
  // tmq_conf_set(conf, "enable.auto.commit", "true");
UNCOV
1167
  tmq_conf_set(conf, "enable.auto.commit", "false");
×
1168
  // tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
1169
  // tmq_conf_set(conf, "auto.offset.reset", "none");
1170
  // tmq_conf_set(conf, "auto.offset.reset", "earliest");
1171
  tmq_conf_set(conf, "auto.offset.reset", "earliest");
×
1172
  //
UNCOV
1173
  if (g_stConfInfo.useSnapshot) {
×
1174
    tmq_conf_set(conf, "experimental.snapshot.enable", "true");
×
1175
  }
1176

UNCOV
1177
  pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
×
1178

1179
  tmq_conf_destroy(conf);
×
1180

1181
  //################### set topic ##########################
1182
  pInfo->topicList = tmq_list_new();
×
1183
  tmq_list_append(pInfo->topicList, g_stConfInfo.topic);
×
1184

UNCOV
1185
  if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
×
UNCOV
1186
    taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
×
1187
    return NULL;
×
1188
  }
1189

1190
  int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
×
1191
  if (err != 0) {
×
UNCOV
1192
    pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
×
UNCOV
1193
    taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
×
1194
    return NULL;
×
1195
  }
1196

1197
  tmq_list_destroy(pInfo->topicList);
×
UNCOV
1198
  pInfo->topicList = NULL;
×
1199

1200
  omb_loop_consume(pInfo);
×
1201

1202
  err = tmq_unsubscribe(pInfo->tmq);
×
UNCOV
1203
  if (err != 0) {
×
UNCOV
1204
    pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
×
1205
    taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
×
1206
  }
1207

1208
  err = tmq_consumer_close(pInfo->tmq);
×
UNCOV
1209
  if (err != 0) {
×
1210
    pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
×
UNCOV
1211
    taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
×
1212
  }
UNCOV
1213
  pInfo->tmq = NULL;
×
1214

1215
  return NULL;
×
1216
}
1217

UNCOV
1218
static int queryDbExec(TAOS* taos, char* command, QUERY_TYPE type) {
×
1219
  TAOS_RES* res = taos_query(taos, command);
×
1220
  int32_t   code = taos_errno(res);
×
1221

1222
  if (code != 0) {
×
UNCOV
1223
    pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC);
×
UNCOV
1224
    taos_free_result(res);
×
1225
    return -1;
×
1226
  }
1227

1228
  if (INSERT_TYPE == type) {
×
UNCOV
1229
    int affectedRows = taos_affected_rows(res);
×
UNCOV
1230
    taos_free_result(res);
×
1231
    return affectedRows;
×
1232
  }
1233

UNCOV
1234
  taos_free_result(res);
×
1235
  return 0;
×
1236
}
1237

1238
void* ombProduceThreadFunc(void* param) {
×
1239
  SThreadInfo* pInfo = (SThreadInfo*)param;
×
1240

1241
  pInfo->taos = createNewTaosConnect();
×
UNCOV
1242
  if (pInfo->taos == NULL) {
×
UNCOV
1243
    taosFprintfFile(g_fp, "taos_connect() fail, can not start producers!\n");
×
1244
    return NULL;
×
1245
  }
1246

1247
  int64_t affectedRowsTotal = 0;
×
1248
  int64_t sendMsgs = 0;
×
1249

1250
  uint32_t totalSendLoopTimes =
×
1251
      g_stConfInfo.runDurationMinutes * 60 * 1000 / SEND_TIME_UNIT;  // send some msgs per 10ms
×
1252
  uint32_t batchPerTblTimes = pInfo->producerRate / 100 / g_stConfInfo.batchSize;
×
UNCOV
1253
  uint32_t remainder = (pInfo->producerRate / 100) % g_stConfInfo.batchSize;
×
UNCOV
1254
  if (remainder) {
×
1255
    batchPerTblTimes += 1;
×
1256
  }
1257

1258
  char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN);
×
1259
  if (NULL == sqlBuf) {
×
1260
    printf("malloc fail for sqlBuf\n");
×
UNCOV
1261
    taos_close(pInfo->taos);
×
UNCOV
1262
    pInfo->taos = NULL;
×
1263
    return NULL;
×
1264
  }
1265

1266
  printf("Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d\n", totalSendLoopTimes,
×
1267
         batchPerTblTimes, pInfo->producerRate);
1268

1269
  char ctbName[128] = {0};
×
1270
  sprintf(ctbName, "%s.ctb%d", g_stConfInfo.dbName, pInfo->consumerId);
×
1271

1272
  int64_t lastPrintTime = taosGetTimestampUs();
×
1273
  int64_t totalMsgLen = 0;
×
1274
  // int64_t timeStamp = taosGetTimestampUs();
1275
  while (totalSendLoopTimes) {
×
1276
    int64_t startTs = taosGetTimestampUs();
×
1277
    for (int i = 0; i < batchPerTblTimes; ++i) {
×
UNCOV
1278
      uint32_t msgsOfSql = g_stConfInfo.batchSize;
×
1279
      if ((i == batchPerTblTimes - 1) && (0 != remainder)) {
×
1280
        msgsOfSql = remainder;
×
1281
      }
1282
      int len = 0;
×
1283
      len += tsnprintf(sqlBuf + len, MAX_SQL_LEN - len, "insert into %s values ", ctbName);
×
1284
      for (int j = 0; j < msgsOfSql; j++) {
×
1285
        int64_t timeStamp = taosGetTimestampNs();
×
UNCOV
1286
        len += tsnprintf(sqlBuf + len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload);
×
UNCOV
1287
        sendMsgs++;
×
1288
        pInfo->totalProduceMsgs++;
×
1289
      }
1290

1291
      totalMsgLen += len;
×
1292
      pInfo->totalMsgsLen += len;
×
1293

1294
      int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE);
×
1295
      if (affectedRows < 0) {
×
1296
        taos_close(pInfo->taos);
×
UNCOV
1297
        pInfo->taos = NULL;
×
UNCOV
1298
        taosMemoryFree(sqlBuf);
×
1299
        return NULL;
×
1300
      }
1301

UNCOV
1302
      affectedRowsTotal += affectedRows;
×
1303

1304
      // printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows);
1305
    }
1306
    totalSendLoopTimes -= 1;
×
1307

1308
    // calc spent time
1309
    int64_t currentTs = taosGetTimestampUs();
×
UNCOV
1310
    int64_t delta = currentTs - startTs;
×
1311
    if (delta < SEND_TIME_UNIT * 1000) {
×
UNCOV
1312
      int64_t sleepLen = (int32_t)(SEND_TIME_UNIT * 1000 - delta);
×
1313
      // printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta);
1314
      taosUsleep((int32_t)sleepLen);
×
1315
    }
1316

1317
    currentTs = taosGetTimestampUs();
×
UNCOV
1318
    delta = currentTs - lastPrintTime;
×
1319
    if (delta > 10 * 1000 * 1000) {
×
1320
      printf("producer[%d] info: %" PRId64 " msgs, %" PRId64 " Byte, %" PRId64 " us, totalSendLoopTimes: %d\n",
×
1321
             pInfo->consumerId, sendMsgs, totalMsgLen, delta, totalSendLoopTimes);
1322
      printf("producer[%d] rate: %1.f msgs/s, %1.f KB/s\n", pInfo->consumerId, sendMsgs * 1000.0 * 1000 / delta,
×
1323
             (totalMsgLen / 1024.0) / (delta / (1000 * 1000)));
×
UNCOV
1324
      lastPrintTime = currentTs;
×
UNCOV
1325
      sendMsgs = 0;
×
UNCOV
1326
      totalMsgLen = 0;
×
1327
    }
1328
  }
1329

1330
  printf("affectedRowsTotal: %" PRId64 "\n", affectedRowsTotal);
×
1331
  taos_close(pInfo->taos);
×
UNCOV
1332
  pInfo->taos = NULL;
×
UNCOV
1333
  taosMemoryFree(sqlBuf);
×
1334
  return NULL;
×
1335
}
1336

1337
void printProduceInfo(int64_t start) {
×
1338
  int64_t totalMsgs = 0;
×
1339
  int64_t totalLenOfMsgs = 0;
×
UNCOV
1340
  for (int i = 0; i < g_stConfInfo.producers; i++) {
×
UNCOV
1341
    totalMsgs += g_stConfInfo.stProdThreads[i].totalProduceMsgs;
×
1342
    totalLenOfMsgs += g_stConfInfo.stProdThreads[i].totalMsgsLen;
×
1343
  }
1344

1345
  int64_t end = taosGetTimestampUs();
×
1346

1347
  int64_t t = end - start;
×
1348
  if (0 == t) t = 1;
×
1349

1350
  double tInMs = (double)t / 1000000.0;
×
1351
  printf("Spent %.3f seconds to prod %" PRIu64 " msgs, %" PRIu64 " Byte\n\n", tInMs, totalMsgs, totalLenOfMsgs);
×
1352

1353
  printf("Spent %.3f seconds to prod %" PRIu64 " msgs with %d producer(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
×
UNCOV
1354
         tInMs, totalMsgs, g_stConfInfo.producers, (double)totalMsgs / tInMs,
×
UNCOV
1355
         (double)totalLenOfMsgs / (1024.0 * 1024) / tInMs);
×
1356
  return;
×
1357
}
1358

1359
void startOmbConsume() {
×
UNCOV
1360
  TdThreadAttr thattr;
×
1361
  taosThreadAttrInit(&thattr);
×
1362
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
×
1363

1364
  if (0 != g_stConfInfo.producers) {
×
1365
    TAOS* taos = createNewTaosConnect();
×
UNCOV
1366
    if (taos == NULL) {
×
UNCOV
1367
      taosFprintfFile(g_fp, "taos_connect() fail, can not create db, stbl, ctbl, topic!\n");
×
1368
      return;
×
1369
    }
1370

1371
    char stbName[16] = "stb";
×
1372
    char ctbPrefix[16] = "ctb";
×
1373

1374
    char sql[256] = {0};
×
UNCOV
1375
    sprintf(sql, "drop database if exists %s", g_stConfInfo.dbName);
×
1376
    printf("SQL: %s\n", sql);
×
UNCOV
1377
    queryDbExec(taos, sql, NO_INSERT_TYPE);
×
1378

1379
    sprintf(sql, "create database if not exists %s precision 'ns' vgroups %d", g_stConfInfo.dbName,
×
1380
            g_stConfInfo.producers);
1381
    printf("SQL: %s\n", sql);
×
UNCOV
1382
    queryDbExec(taos, sql, NO_INSERT_TYPE);
×
1383

1384
    sprintf(sql, "create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) ", g_stConfInfo.dbName,
×
1385
            stbName, g_stConfInfo.payloadLen);
1386
    printf("SQL: %s\n", sql);
×
1387
    queryDbExec(taos, sql, NO_INSERT_TYPE);
×
1388

1389
    for (int i = 0; i < g_stConfInfo.producers; i++) {
×
1390
      sprintf(sql, "create table %s.%s%d using %s.stb tags(%d) ", g_stConfInfo.dbName, ctbPrefix, i,
×
1391
              g_stConfInfo.dbName, i);
UNCOV
1392
      printf("SQL: %s\n", sql);
×
UNCOV
1393
      queryDbExec(taos, sql, NO_INSERT_TYPE);
×
1394
    }
1395

1396
    // create topic
UNCOV
1397
    sprintf(sql, "create topic %s as stable %s.%s", g_stConfInfo.topic, g_stConfInfo.dbName, stbName);
×
1398
    printf("SQL: %s\n", sql);
×
UNCOV
1399
    queryDbExec(taos, sql, NO_INSERT_TYPE);
×
1400

1401
    int32_t producerRate = ceil(((double)g_stConfInfo.producerRate) / g_stConfInfo.producers);
×
1402

1403
    printf("==== create %d produce thread ====\n", g_stConfInfo.producers);
×
1404
    for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {
×
1405
      g_stConfInfo.stProdThreads[i].consumerId = i;
×
UNCOV
1406
      g_stConfInfo.stProdThreads[i].producerRate = producerRate;
×
UNCOV
1407
      taosThreadCreate(&(g_stConfInfo.stProdThreads[i].thread), &thattr, ombProduceThreadFunc,
×
1408
                       (void*)(&(g_stConfInfo.stProdThreads[i])));
×
1409
    }
1410

1411
    if (0 == g_stConfInfo.numOfThread) {
×
1412
      int64_t start = taosGetTimestampUs();
×
UNCOV
1413
      for (int32_t i = 0; i < g_stConfInfo.producers; i++) {
×
UNCOV
1414
        taosThreadJoin(g_stConfInfo.stProdThreads[i].thread, NULL);
×
1415
        taosThreadClear(&g_stConfInfo.stProdThreads[i].thread);
×
1416
      }
1417

1418
      printProduceInfo(start);
×
1419

1420
      taosFprintfFile(g_fp, "==== close tmqlog ====\n");
×
UNCOV
1421
      taosCloseFile(&g_fp);
×
UNCOV
1422
      taos_close(taos);
×
1423
      return;
×
1424
    }
1425

UNCOV
1426
    taos_close(taos);
×
1427
  }
1428

1429
  // pthread_create one thread to consume
1430
  taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
×
1431
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
×
UNCOV
1432
    g_stConfInfo.stThreads[i].consumerId = i;
×
UNCOV
1433
    taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, ombConsumeThreadFunc,
×
1434
                     (void*)(&(g_stConfInfo.stThreads[i])));
×
1435
  }
1436

1437
  int64_t start = taosGetTimestampUs();
×
1438

UNCOV
1439
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
×
UNCOV
1440
    taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
×
1441
    taosThreadClear(&g_stConfInfo.stThreads[i].thread);
×
1442
  }
1443

1444
  int64_t end = taosGetTimestampUs();
×
1445

1446
  int64_t totalRows = 0;
×
1447
  int64_t totalMsgs = 0;
×
1448
  int64_t totalLenOfMsgs = 0;
×
1449
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
×
UNCOV
1450
    totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
×
UNCOV
1451
    totalLenOfMsgs += g_stConfInfo.stThreads[i].consumeLen;
×
1452
    totalRows += g_stConfInfo.stThreads[i].consumeRowCnt;
×
1453
  }
1454

1455
  int64_t t = end - start;
×
1456
  if (0 == t) t = 1;
×
1457

1458
  double tInMs = (double)t / 1000000.0;
×
1459
  taosFprintfFile(
×
1460
      g_fp, "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
1461
      tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs),
×
UNCOV
1462
      (double)totalLenOfMsgs / (1024 * 1024) / tInMs);
×
1463

1464
  printf("Spent %.3f seconds to cons rows: %" PRIu64 " msgs: %" PRIu64
×
1465
         " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
1466
         tInMs, totalRows, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs),
×
1467
         (double)totalLenOfMsgs / (1024 * 1024) / tInMs);
×
1468

1469
  taosFprintfFile(g_fp, "==== close tmqlog ====\n");
×
UNCOV
1470
  taosCloseFile(&g_fp);
×
1471

UNCOV
1472
  return;
×
1473
}
1474

1475
int main(int32_t argc, char* argv[]) {
67,091✔
1476
  parseArgument(argc, argv);
67,091✔
1477

1478
  if (0 != strlen(g_stConfInfo.topic)) {
67,091✔
UNCOV
1479
    startOmbConsume();
×
UNCOV
1480
    return 0;
×
1481
  }
1482

1483
  int32_t retCode = getConsumeInfo();
67,091✔
1484
  if (0 != retCode) {
67,091✔
UNCOV
1485
    return -1;
×
1486
  }
1487

1488
  saveConfigToLogFile();
67,091✔
1489

1490
  tmqSetSignalHandle();
67,091✔
1491

1492
  TdThreadAttr thattr;
63,087✔
1493
  taosThreadAttrInit(&thattr);
67,091✔
1494
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
67,091✔
1495

1496
  // pthread_create one thread to consume
1497
  taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
67,091✔
1498
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
140,335✔
1499
    taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
73,244✔
1500
                     (void*)(&(g_stConfInfo.stThreads[i])));
73,244✔
1501
  }
1502

1503
  int64_t start = taosGetTimestampUs();
67,091✔
1504

1505
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
140,335✔
1506
    taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
73,244✔
1507
    taosThreadClear(&g_stConfInfo.stThreads[i].thread);
73,244✔
1508
  }
1509

1510
  int64_t end = taosGetTimestampUs();
67,091✔
1511

1512
  int64_t totalMsgs = 0;
67,091✔
1513
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
140,335✔
1514
    totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
73,244✔
1515
  }
1516

1517
  int64_t t = end - start;
67,091✔
1518
  if (0 == t) t = 1;
67,091✔
1519

1520
  double tInMs = (double)t / 1000000.0;
67,091✔
1521
  taosFprintfFile(g_fp,
67,091✔
1522
                  "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/second\n\n",
1523
                  tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
63,087✔
1524

1525
  taosFprintfFile(g_fp, "==== close tmqlog ====\n");
67,091✔
1526
  taosCloseFile(&g_fp);
67,091✔
1527

1528
  return 0;
67,091✔
1529
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc