• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4909

30 Dec 2025 10:52AM UTC coverage: 65.542% (+0.2%) from 65.386%
#4909

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

857 existing lines in 113 files now uncovered.

193924 of 295877 relevant lines covered (65.54%)

120594206.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.29
/utils/test/c/tmqSim.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <assert.h>
17
#include <math.h>
18
#include <stdio.h>
19
#include <stdlib.h>
20
#include <string.h>
21
#include <sys/stat.h>
22
#include <sys/types.h>
23
#include <time.h>
24

25
#include "taos.h"
26
#include "taosdef.h"
27
#include "taoserror.h"
28
#include "tlog.h"
29
#include "types.h"
30

31
#define GREEN     "\033[1;32m"
32
#define NC        "\033[0m"
33
#define min(a, b) (((a) < (b)) ? (a) : (b))
34

35
#define MAX_SQL_STR_LEN         (1024 * 1024)
36
#define MAX_ROW_STR_LEN         (16 * 1024)
37
#define MAX_CONSUMER_THREAD_CNT (16)
38
#define MAX_VGROUP_CNT          (32)
39
#define SEND_TIME_UNIT          10  // ms
40
#define MAX_SQL_LEN             1048576
41

42
typedef enum {
43
  NOTIFY_CMD_START_CONSUM,
44
  NOTIFY_CMD_START_COMMIT,
45
  NOTIFY_CMD_ID_BUTT,
46
} NOTIFY_CMD_ID;
47

48
typedef enum enumQUERY_TYPE { NO_INSERT_TYPE, INSERT_TYPE, QUERY_TYPE_BUT } QUERY_TYPE;
49

50
typedef struct {
51
  TdThread thread;
52
  int32_t  consumerId;
53

54
  int32_t ifManualCommit;
55
  // int32_t  autoCommitIntervalMs;  // 1000 ms
56
  // char     autoCommit[8];         // true, false
57
  // char     autoOffsetRest[16];    // none, earliest, latest
58

59
  TdFilePtr pConsumeRowsFile;
60
  TdFilePtr pConsumeMetaFile;
61
  int32_t   ifCheckData;
62
  int64_t   expectMsgCnt;
63

64
  int64_t consumeMsgCnt;
65
  int64_t consumeRowCnt;
66
  int64_t consumeLen;
67
  int32_t checkresult;
68

69
  char topicString[1024];
70
  char keyString[1024];
71

72
  int32_t numOfTopic;
73
  char    topics[32][64];
74

75
  int32_t numOfKey;
76
  char    key[32][64];
77
  char    value[32][64];
78

79
  tmq_t*      tmq;
80
  tmq_list_t* topicList;
81

82
  int32_t numOfVgroups;
83
  int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2];  // [i][0]: vgroup id, [i][1]: rows of consume
84
  int64_t ts;
85

86
  TAOS* taos;
87

88
  // below parameters is used by omb test
89
  int32_t producerRate;  // unit: msgs/s
90
  int64_t totalProduceMsgs;
91
  int64_t totalMsgsLen;
92

93
} SThreadInfo;
94

95
typedef struct {
96
  // input from argvs
97
  char        cdbName[32];
98
  char        dbName[64];
99
  int32_t     showMsgFlag;
100
  int32_t     showRowFlag;
101
  int32_t     saveRowFlag;
102
  int32_t     consumeDelay;  // unit s
103
  int32_t     numOfThread;
104
  int32_t     useSnapshot;
105
  int64_t     nowTime;
106
  SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
107

108
  SThreadInfo stProdThreads[MAX_CONSUMER_THREAD_CNT];
109

110
  // below parameters is used by omb test
111
  char    topic[64];
112
  int32_t producers;
113
  int32_t producerRate;
114
  int32_t runDurationMinutes;
115
  int32_t batchSize;
116
  int32_t payloadLen;
117
} SConfInfo;
118

119
static SConfInfo g_stConfInfo;
120
TdFilePtr        g_fp = NULL;
121
static int       running = 1;
122
char*            g_payload = NULL;
123

124
// char* g_pRowValue = NULL;
125
// TdFilePtr g_fp = NULL;
126

127
static void printHelp() {
×
128
  char indent[10] = "        ";
×
129
  printf("Used to test the tmq feature with sim cases\n");
×
130

131
  printf("%s%s\n", indent, "-c");
×
132
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
×
133
  printf("%s%s\n", indent, "-d");
×
134
  printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
×
135
  printf("%s%s\n", indent, "-g");
×
136
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
×
137
  printf("%s%s\n", indent, "-r");
×
138
  printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
×
139
  printf("%s%s\n", indent, "-s");
×
140
  printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag);
×
141
  printf("%s%s\n", indent, "-y");
×
142
  printf("%s%s%s%ds\n", indent, indent, "consume delay, default is ", g_stConfInfo.consumeDelay);
×
143
  printf("%s%s\n", indent, "-e");
×
144
  printf("%s%s%s%d\n", indent, indent, "snapshot, default is ", g_stConfInfo.useSnapshot);
×
145

146
  printf("%s%s\n", indent, "-t");
×
147
  printf("%s%s%s\n", indent, indent, "topic name, default is null");
×
148

149
  printf("%s%s\n", indent, "-x");
×
150
  printf("%s%s%s\n", indent, indent, "consume thread number, default is 1");
×
151

152
  printf("%s%s\n", indent, "-l");
×
153
  printf("%s%s%s%d\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes);
×
154
  printf("%s%s\n", indent, "-p");
×
155
  printf("%s%s%s\n", indent, indent, "producer thread number, default is 0");
×
156
  printf("%s%s\n", indent, "-b");
×
157
  printf("%s%s%s\n", indent, indent, "batch size, default is 1");
×
158
  printf("%s%s\n", indent, "-i");
×
159
  printf("%s%s%s\n", indent, indent, "produce rate unit is msgs /s, default is 100000");
×
160
  printf("%s%s\n", indent, "-n");
×
161
  printf("%s%s%s\n", indent, indent, "payload len unit is byte, default is 1000");
×
162

163
  exit(EXIT_SUCCESS);
×
164
}
165

166
char* getCurrentTimeString(char* timeString) {
3,895,482✔
167
  time_t    tTime = taosGetTimestampSec();
3,895,482✔
168
  struct tm tm;
3,862,539✔
169
  taosLocalTime(&tTime, &tm, NULL, 0, NULL);
3,895,482✔
170
  sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
3,895,482✔
171
          tm.tm_min, tm.tm_sec);
172

173
  return timeString;
3,895,482✔
174
}
175

176
static void tmqStop(int signum, void* info, void* ctx) {
×
177
  running = 0;
×
178
  char tmpString[128];
×
179
  taosFprintfFile(g_fp, "%s tmqStop() receive stop signal[%d]\n", getCurrentTimeString(tmpString), signum);
×
180
}
×
181

182
static void tmqSetSignalHandle() { taosSetSignal(SIGINT, tmqStop); }
64,262✔
183

184
void initLogFile() {
64,262✔
185
  char filename[256];
60,173✔
186
  char tmpString[128];
60,173✔
187

188
  pid_t process_id = taosGetPId();
64,262✔
189

190
  if (0 != strlen(g_stConfInfo.topic)) {
64,262✔
191
    sprintf(filename, "/tmp/tmqlog-%d-%s.txt", process_id, getCurrentTimeString(tmpString));
×
192
  } else {
193
    sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
64,262✔
194
  }
195
#ifdef WINDOWS
196
  for (int i = 2; i < sizeof(filename); i++) {
197
    if (filename[i] == ':') filename[i] = '-';
198
    if (filename[i] == '\0') break;
199
  }
200
#endif
201
  TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
64,262✔
202
  if (NULL == pFile) {
64,262✔
203
    fprintf(stderr, "Failed to open %s for save result\n", filename);
×
204
    exit(-1);
×
205
  }
206
  g_fp = pFile;
64,262✔
207
}
64,262✔
208

209
void saveConfigToLogFile() {
64,262✔
210
  taosFprintfFile(g_fp, "###################################################################\n");
64,262✔
211
  taosFprintfFile(g_fp, "# configDir:           %s\n", configDir);
64,262✔
212
  taosFprintfFile(g_fp, "# dbName:              %s\n", g_stConfInfo.dbName);
64,262✔
213
  taosFprintfFile(g_fp, "# cdbName:             %s\n", g_stConfInfo.cdbName);
64,262✔
214
  taosFprintfFile(g_fp, "# showMsgFlag:         %d\n", g_stConfInfo.showMsgFlag);
64,262✔
215
  taosFprintfFile(g_fp, "# showRowFlag:         %d\n", g_stConfInfo.showRowFlag);
64,262✔
216
  taosFprintfFile(g_fp, "# saveRowFlag:         %d\n", g_stConfInfo.saveRowFlag);
64,262✔
217
  taosFprintfFile(g_fp, "# consumeDelay:        %d\n", g_stConfInfo.consumeDelay);
64,262✔
218
  taosFprintfFile(g_fp, "# numOfThread:         %d\n", g_stConfInfo.numOfThread);
64,262✔
219

220
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
134,303✔
221
    taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
70,041✔
222
    // taosFprintfFile(g_fp, "  auto commit:              %s\n", g_stConfInfo.stThreads[i].autoCommit);
223
    // taosFprintfFile(g_fp, "  auto commit interval ms:  %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
224
    // taosFprintfFile(g_fp, "  auto offset rest:         %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
225
    taosFprintfFile(g_fp, "  Topics: ");
70,041✔
226
    for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
144,198✔
227
      taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
74,157✔
228
    }
229
    taosFprintfFile(g_fp, "\n");
70,041✔
230
    taosFprintfFile(g_fp, "  Key: ");
70,041✔
231
    for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) {
350,205✔
232
      taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
280,164✔
233
    }
234
    taosFprintfFile(g_fp, "\n");
70,041✔
235
    taosFprintfFile(g_fp, "  expect rows: %" PRId64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt);
70,041✔
236
  }
237

238
  char tmpString[128];
60,173✔
239
  taosFprintfFile(g_fp, "# Test time:                %s\n", getCurrentTimeString(tmpString));
64,262✔
240
  taosFprintfFile(g_fp, "###################################################################\n");
64,262✔
241
}
64,262✔
242

243
void parseArgument(int32_t argc, char* argv[]) {
64,262✔
244
  memset(&g_stConfInfo, 0, sizeof(SConfInfo));
64,262✔
245
  g_stConfInfo.showMsgFlag = 0;
64,262✔
246
  g_stConfInfo.showRowFlag = 0;
64,262✔
247
  g_stConfInfo.saveRowFlag = 0;
64,262✔
248
  g_stConfInfo.consumeDelay = 5;
64,262✔
249
  g_stConfInfo.numOfThread = 1;
64,262✔
250
  g_stConfInfo.batchSize = 1;
64,262✔
251
  g_stConfInfo.producers = 0;
64,262✔
252

253
  g_stConfInfo.nowTime = taosGetTimestampMs();
64,262✔
254

255
  for (int32_t i = 1; i < argc; i++) {
498,259✔
256
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
433,997✔
257
      printHelp();
×
258
      exit(0);
×
259
    } else if (strcmp(argv[i], "-d") == 0) {
433,997✔
260
      tstrncpy(g_stConfInfo.dbName, argv[++i], sizeof(g_stConfInfo.dbName));
64,262✔
261
    } else if (strcmp(argv[i], "-w") == 0) {
369,735✔
262
      tstrncpy(g_stConfInfo.cdbName, argv[++i], sizeof(g_stConfInfo.cdbName));
64,262✔
263
    } else if (strcmp(argv[i], "-c") == 0) {
305,473✔
264
      tstrncpy(configDir, argv[++i], PATH_MAX);
64,262✔
265
    } else if (strcmp(argv[i], "-g") == 0) {
241,211✔
266
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
64,262✔
267
    } else if (strcmp(argv[i], "-r") == 0) {
176,949✔
268
      g_stConfInfo.showRowFlag = atol(argv[++i]);
64,262✔
269
    } else if (strcmp(argv[i], "-s") == 0) {
112,687✔
270
      g_stConfInfo.saveRowFlag = atol(argv[++i]);
×
271
    } else if (strcmp(argv[i], "-y") == 0) {
112,687✔
272
      g_stConfInfo.consumeDelay = atol(argv[++i]);
64,262✔
273
    } else if (strcmp(argv[i], "-e") == 0) {
48,425✔
274
      g_stConfInfo.useSnapshot = atol(argv[++i]);
48,425✔
275
    } else if (strcmp(argv[i], "-t") == 0) {
×
276
      char tmpBuf[56] = {0};
×
277
      tstrncpy(tmpBuf, argv[++i], sizeof(tmpBuf));
×
278
      sprintf(g_stConfInfo.topic, "`%s`", tmpBuf);
×
279
    } else if (strcmp(argv[i], "-x") == 0) {
×
280
      g_stConfInfo.numOfThread = atol(argv[++i]);
×
281
    } else if (strcmp(argv[i], "-l") == 0) {
×
282
      g_stConfInfo.runDurationMinutes = atol(argv[++i]);
×
283
    } else if (strcmp(argv[i], "-p") == 0) {
×
284
      g_stConfInfo.producers = atol(argv[++i]);
×
285
    } else if (strcmp(argv[i], "-b") == 0) {
×
286
      g_stConfInfo.batchSize = atol(argv[++i]);
×
287
    } else if (strcmp(argv[i], "-i") == 0) {
×
288
      g_stConfInfo.producerRate = atol(argv[++i]);
×
289
    } else if (strcmp(argv[i], "-n") == 0) {
×
290
      g_stConfInfo.payloadLen = atol(argv[++i]);
×
291
      if (g_stConfInfo.payloadLen <= 0 || g_stConfInfo.payloadLen > 1024 * 1024 * 1024) {
×
292
        pError("%s calloc size is too large: %s %s", GREEN, argv[++i], NC);
×
293
        exit(-1);
×
294
      }
295
    } else {
296
      pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
×
297
      exit(-1);
×
298
    }
299
  }
300

301
  g_payload = taosMemoryCalloc(g_stConfInfo.payloadLen + 1, 1);
64,262✔
302
  if (NULL == g_payload) {
64,262✔
303
    pPrint("%s failed to malloc for payload %s", GREEN, NC);
×
304
    exit(-1);
×
305
  }
306

307
  for (int32_t i = 0; i < g_stConfInfo.payloadLen; i++) {
64,262✔
308
    strcpy(&g_payload[i], "a");
×
309
  }
310

311
  initLogFile();
64,262✔
312

313
  taosFprintfFile(g_fp, "====parseArgument() success\n");
64,262✔
314

315
#if 1
316
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
64,262✔
317
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
64,262✔
318
  pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC);
64,262✔
319
  pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
64,262✔
320
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
64,262✔
321
  pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
64,262✔
322
  pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC);
64,262✔
323

324
  pPrint("%s snapshot:%d %s", GREEN, g_stConfInfo.useSnapshot, NC);
64,262✔
325

326
  pPrint("%s omb topic:%s %s", GREEN, g_stConfInfo.topic, NC);
64,262✔
327
  pPrint("%s numOfThread:%d %s", GREEN, g_stConfInfo.numOfThread, NC);
64,262✔
328
#endif
329
}
64,262✔
330

331
void splitStr(char** arr, char* str, const char* del) {
×
332
  char* s = strtok(str, del);
×
333
  while (s != NULL) {
×
334
    *arr++ = s;
×
335
    s = strtok(NULL, del);
×
336
  }
337
}
×
338

339
void ltrim(char* str) {
354,321✔
340
  if (str == NULL || *str == '\0') {
354,321✔
341
    return;
×
342
  }
343
  int   len = 0;
354,321✔
344
  char* p = str;
354,321✔
345
  while (*p != '\0' && isspace(*p)) {
2,853,756✔
346
    ++p;
2,499,435✔
347
    ++len;
2,499,435✔
348
  }
349
  memmove(str, p, strlen(str) - len + 1);
354,321✔
350
  // return str;
351
}
352

353
int queryDB(TAOS* taos, char* command) {
70,041✔
354
  int       retryCnt = 10;
70,041✔
355
  int       code = 0;
70,041✔
356
  TAOS_RES* pRes = NULL;
70,041✔
357

358
  while (retryCnt--) {
72,541✔
359
    pRes = taos_query(taos, command);
72,541✔
360
    code = taos_errno(pRes);
72,541✔
361
    if (code != 0) {
72,541✔
362
      taosSsleep(1);
2,500✔
363
      taos_free_result(pRes);
2,500✔
364
      pRes = NULL;
2,500✔
365
      continue;
2,500✔
366
    }
367
    taos_free_result(pRes);
70,041✔
368
    return 0;
70,041✔
369
  }
370

371
  pError("failed to reason:%s, sql: %s", tstrerror(code), command);
×
372
  taos_free_result(pRes);
×
373
  return -1;
×
374
}
375

376
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
14,498,502✔
377
  int32_t i;
378
  for (i = 0; i < pInfo->numOfVgroups; i++) {
31,011,793✔
379
    if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
30,851,928✔
380
      pInfo->rowsOfPerVgroups[i][1] += rows;
14,338,637✔
381
      return;
14,338,637✔
382
    }
383
  }
384

385
  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
160,195✔
386
  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
160,195✔
387
  pInfo->numOfVgroups++;
160,195✔
388

389
  taosFprintfFile(g_fp, "consume id %d, add new vgroupId:%d\n", pInfo->consumerId, vgroupId);
160,195✔
390
  if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
160,195✔
391
    taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
×
392
                    pInfo->numOfVgroups, vgroupId);
393
    taosCloseFile(&g_fp);
×
394
    exit(-1);
×
395
  }
396
}
397

398
TAOS* createNewTaosConnect() {
134,303✔
399
  TAOS*   taos = NULL;
134,303✔
400
  int32_t retryCnt = 10;
134,303✔
401

402
  while (retryCnt--) {
134,303✔
403
    taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
134,303✔
404
    if (NULL != taos) {
134,303✔
405
      return taos;
134,303✔
406
    }
407
    taosSsleep(1);
×
408
  }
409

410
  taosFprintfFile(g_fp, "taos_connect() fail\n");
×
411
  return NULL;
×
412
}
413

414
int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
×
415
  char sqlStr[1100] = {0};
×
416

417
  if (strlen(buf) > 1024) {
×
418
    taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", (int)strlen(buf));
×
419
    taosCloseFile(&g_fp);
×
420
    return -1;
×
421
  }
422

423
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
×
424
  if (pConn == NULL) {
×
425
    taosFprintfFile(g_fp, "taos_connect() fail, can not save consume result to main script\n");
×
426
    return -1;
×
427
  }
428

429
  sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
×
430
          pInfo->ts++, buf);
×
431
  int retCode = queryDB(pConn, sqlStr);
×
432
  if (retCode != 0) {
×
433
    taosFprintfFile(g_fp, "error in save consume content\n");
×
434
    taosCloseFile(&g_fp);
×
435
    taos_close(pConn);
×
436
    exit(-1);
×
437
  }
438

439
  taos_close(pConn);
×
440

441
  return 0;
×
442
}
443

444
static char* shellFormatTimestamp(char* buf, int32_t bufSize, int64_t val, int32_t precision) {
2,147,483,647✔
445
  // if (shell.args.is_raw_time) {
446
  //   sprintf(buf, "%" PRId64, val);
447
  //   return buf;
448
  // }
449

450
  time_t  tt;
2,147,483,647✔
451
  int32_t ms = 0;
2,147,483,647✔
452
  if (precision == TSDB_TIME_PRECISION_NANO) {
2,147,483,647✔
453
    tt = (time_t)(val / 1000000000);
13,150,000✔
454
    ms = val % 1000000000;
13,150,000✔
455
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
2,147,483,647✔
456
    tt = (time_t)(val / 1000000);
13,400,000✔
457
    ms = val % 1000000;
13,400,000✔
458
  } else {
459
    tt = (time_t)(val / 1000);
2,147,483,647✔
460
    ms = val % 1000;
2,147,483,647✔
461
  }
462

463
  if (tt <= 0 && ms < 0) {
2,147,483,647✔
464
    tt--;
×
465
    if (precision == TSDB_TIME_PRECISION_NANO) {
×
466
      ms += 1000000000;
×
467
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
×
468
      ms += 1000000;
×
469
    } else {
470
      ms += 1000;
×
471
    }
472
  }
473

474
  struct tm ptm;
2,147,483,647✔
475
  if (taosLocalTime(&tt, &ptm, buf, bufSize, NULL) == NULL) {
2,147,483,647✔
476
    return buf;
×
477
  }
478
  size_t pos = taosStrfTime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
2,147,483,647✔
479

480
  if (precision == TSDB_TIME_PRECISION_NANO) {
2,147,483,647✔
481
    sprintf(buf + pos, ".%09d", ms);
13,114,232✔
482
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
2,147,483,647✔
483
    sprintf(buf + pos, ".%06d", ms);
13,400,000✔
484
  } else {
485
    sprintf(buf + pos, ".%03d", ms);
2,147,483,647✔
486
  }
487

488
  return buf;
2,147,483,647✔
489
}
490

491
static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* field, int32_t length,
2,147,483,647✔
492
                                 int32_t precision) {
493
  if (val == NULL) {
2,147,483,647✔
494
    taosFprintfFile(pFile, "NULL");
133,895,626✔
495
    return;
133,890,778✔
496
  }
497

498
  char quotationStr[2];
2,147,483,647✔
499
  quotationStr[0] = '\"';
2,147,483,647✔
500
  quotationStr[1] = 0;
2,147,483,647✔
501

502
  int  n;
503
  char buf[TSDB_MAX_BYTES_PER_ROW];
2,147,483,647✔
504
  switch (field->type) {
2,147,483,647✔
505
    case TSDB_DATA_TYPE_BOOL:
×
506
      taosFprintfFile(pFile, "%d", ((((int32_t)(*((char*)val))) == 1) ? 1 : 0));
×
507
      break;
×
508
    case TSDB_DATA_TYPE_TINYINT:
×
509
      taosFprintfFile(pFile, "%d", *((int8_t*)val));
×
510
      break;
×
511
    case TSDB_DATA_TYPE_UTINYINT:
×
512
      taosFprintfFile(pFile, "%u", *((uint8_t*)val));
×
513
      break;
×
514
    case TSDB_DATA_TYPE_SMALLINT:
×
515
      taosFprintfFile(pFile, "%d", *((int16_t*)val));
×
516
      break;
×
517
    case TSDB_DATA_TYPE_USMALLINT:
×
518
      taosFprintfFile(pFile, "%u", *((uint16_t*)val));
×
519
      break;
×
520
    case TSDB_DATA_TYPE_INT:
2,147,483,647✔
521
      taosFprintfFile(pFile, "%d", *((int32_t*)val));
2,147,483,647✔
522
      break;
2,147,483,647✔
523
    case TSDB_DATA_TYPE_UINT:
×
524
      taosFprintfFile(pFile, "%u", *((uint32_t*)val));
×
525
      break;
×
526
    case TSDB_DATA_TYPE_BIGINT:
2,147,483,647✔
527
      taosFprintfFile(pFile, "%" PRId64, *((int64_t*)val));
2,147,483,647✔
528
      break;
2,147,483,647✔
529
    case TSDB_DATA_TYPE_UBIGINT:
×
530
      taosFprintfFile(pFile, "%" PRIu64, *((uint64_t*)val));
×
531
      break;
×
532
    case TSDB_DATA_TYPE_FLOAT:
78,640✔
533
      taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
78,640✔
534
      break;
78,640✔
535
    case TSDB_DATA_TYPE_DOUBLE:
2,147,483,647✔
536
      n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val));
2,147,483,647✔
537
      if (n > TMAX(25, length)) {
2,147,483,647✔
538
        taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val));
82,038,878✔
539
      } else {
540
        taosFprintfFile(pFile, "%s", buf);
2,109,506,697✔
541
      }
542
      break;
2,147,483,647✔
543
    case TSDB_DATA_TYPE_BINARY:
2,147,483,647✔
544
    case TSDB_DATA_TYPE_VARBINARY:
545
    case TSDB_DATA_TYPE_NCHAR:
546
    case TSDB_DATA_TYPE_JSON:
547
    case TSDB_DATA_TYPE_GEOMETRY: {
548
      int32_t bufIndex = 0;
2,147,483,647✔
549
      for (int32_t i = 0; i < length; i++) {
2,147,483,647✔
550
        buf[bufIndex] = val[i];
2,147,483,647✔
551
        bufIndex++;
2,147,483,647✔
552
        if (val[i] == '\"') {
2,147,483,647✔
553
          buf[bufIndex] = val[i];
×
554
          bufIndex++;
×
555
        }
556
      }
557
      buf[bufIndex] = 0;
2,147,483,647✔
558

559
      taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
2,147,483,647✔
560
    } break;
2,147,483,647✔
561
    case TSDB_DATA_TYPE_TIMESTAMP:
2,147,483,647✔
562
      shellFormatTimestamp(buf, sizeof(buf), *(int64_t*)val, precision);
2,147,483,647✔
563
      taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
2,147,483,647✔
564
      break;
2,147,483,647✔
565
    default:
×
566
      break;
×
567
  }
568
}
569

570
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields,
2,147,483,647✔
571
                               int32_t precision) {
572
  for (int32_t i = 0; i < num_fields; i++) {
2,147,483,647✔
573
    if (i > 0) {
2,147,483,647✔
574
      taosFprintfFile(pFile, ",");
2,147,483,647✔
575
    }
576
    shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision);
2,147,483,647✔
577
  }
578
  taosFprintfFile(pFile, "\n");
2,147,483,647✔
579
}
2,147,483,647✔
580

581
static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
14,498,832✔
582
  char    buf[1024];
14,408,192✔
583
  int32_t totalRows = 0;
14,498,832✔
584

585
  int32_t     vgroupId = tmq_get_vgroup_id(msg);
14,498,832✔
586
  const char* dbName = tmq_get_db_name(msg);
14,498,832✔
587

588
  taosFprintfFile(g_fp, "consumerId: %d, msg index:%d\n", pInfo->consumerId, msgIndex);
14,498,832✔
589
  int32_t index = 0;
14,498,832✔
590
  for (index = 0; index < pInfo->numOfVgroups; index++) {
31,012,123✔
591
    if (vgroupId == pInfo->rowsOfPerVgroups[index][0]) {
30,851,928✔
592
      break;
14,338,637✔
593
    }
594
  }
595

596
  taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId:%d, currentRows:%d\n", dbName != NULL ? dbName : "invalid table",
14,498,832✔
597
                  tmq_get_topic_name(msg), vgroupId, pInfo->rowsOfPerVgroups[index][1]);
14,408,192✔
598

599
  while (1) {
2,147,483,647✔
600
    TAOS_ROW row = taos_fetch_row(msg);
2,147,483,647✔
601
    if (row == NULL) {
2,147,483,647✔
602
      break;
14,498,832✔
603
    }
604

605
    TAOS_FIELD* fields = taos_fetch_fields(msg);
2,147,483,647✔
606
    int32_t     numOfFields = taos_field_count(msg);
2,147,483,647✔
607
    int32_t*    length = taos_fetch_lengths(msg);
2,147,483,647✔
608
    int32_t     precision = taos_result_precision(msg);
2,147,483,647✔
609
    const char* tbName = tmq_get_table_name(msg);
2,147,483,647✔
610

611
#if 0
612
        // get schema
613
        //============================== stub =================================================//
614
        for (int32_t i = 0; i < numOfFields; i++) {
615
          taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
616
        }
617
        //============================== stub =================================================//
618
#endif
619

620
    dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
2,147,483,647✔
621
    taos_print_row(buf, row, fields, numOfFields);
2,147,483,647✔
622

623
    if (0 != g_stConfInfo.showRowFlag) {
2,147,483,647✔
624
      taosFprintfFile(g_fp, "time:%" PRId64 " tbname:%s, rows[%d]: %s\n", taosGetTimestampMs(), (tbName != NULL ? tbName : "null table"), totalRows, buf);
2,147,483,647✔
625
      // if (0 != g_stConfInfo.saveRowFlag) {
626
      //   saveConsumeContentToTbl(pInfo, buf);
627
      // }
628
//      taosFsyncFile(g_fp);
629
    }
630

631
    totalRows++;
2,147,483,647✔
632
  }
633

634
  addRowsToVgroupId(pInfo, vgroupId, totalRows);
14,498,832✔
635
  return totalRows;
14,498,832✔
636
}
637

638
static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
×
639
  char    buf[1024];
640
  int32_t totalRows = 0;
×
641

642
  // printf("topic: %s\n", tmq_get_topic_name(msg));
643
  int32_t     vgroupId = tmq_get_vgroup_id(msg);
×
644
  const char* dbName = tmq_get_db_name(msg);
×
645

646
  taosFprintfFile(g_fp, "consumerId: %d, msg index:%d\n", pInfo->consumerId, msgIndex);
×
647
  taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
×
648
                  tmq_get_topic_name(msg), vgroupId);
649

650
  {
651
    tmq_raw_data raw = {0};
×
652
    int32_t      code = tmq_get_raw(msg, &raw);
×
653

654
    if (code == TSDB_CODE_SUCCESS) {
655
      //          int retCode = queryDB(pInfo->taos, "use metadb");
656
      //          if (retCode != 0) {
657
      //                taosFprintfFile(g_fp, "error when use metadb\n");
658
      //                taosCloseFile(&g_fp);
659
      //                exit(-1);
660
      //          }
661
      //          taosFprintfFile(g_fp, "raw:%p\n", &raw);
662
      //
663
      //      tmq_write_raw(pInfo->taos, raw);
664
    }
665

666
    char* result = tmq_get_json_meta(msg);
×
667
    if (result && strcmp(result, "") != 0) {
×
668
      // printf("meta result: %s\n", result);
669
      taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result);
×
670
    }
671
    tmq_free_json_meta(result);
×
672
  }
673

674
  totalRows++;
×
675

676
  return totalRows;
×
677
}
678

679
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
119,148✔
680

681
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
119,148✔
682
  char sqlStr[1024] = {0};
119,148✔
683

684
  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
685
  sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName,
119,148✔
686
          atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId, pInfo->consumerId);
687

688
  taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
119,148✔
689

690
  taosFprintfFile(g_fp, "notifyMainScript success, sql: %s\n", sqlStr);
119,148✔
691

692
  return 0;
119,148✔
693
}
694

695
static int32_t g_once_commit_flag = 0;
696

697
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
3,556,835✔
698
  taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
3,556,835✔
699

700
  if (0 == g_once_commit_flag && code == 0) {
3,556,835✔
701
    g_once_commit_flag = 1;
54,326✔
702
    notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
54,326✔
703
  }
704

705
  char tmpString[128];
3,549,200✔
706
  taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
3,556,835✔
707
}
3,556,835✔
708

709
void build_consumer(SThreadInfo* pInfo) {
70,041✔
710
  tmq_conf_t* conf = tmq_conf_new();
70,041✔
711

712
  // tmq_conf_set(conf, "td.connect.ip", "localhost");
713
  // tmq_conf_set(conf, "td.connect.port", "6030");
714
  tmq_conf_set(conf, "td.connect.user", "root");
70,041✔
715
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
69,987✔
716

717
  // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
718

719
  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
69,878✔
720

721
  // tmq_conf_set(conf, "group.id", "cgrp1");
722
  for (int32_t i = 0; i < pInfo->numOfKey; i++) {
350,151✔
723
    tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
280,110✔
724
  }
725

726
  tmq_conf_set(conf, "msg.with.table.name", "true");
70,041✔
727

728
  // tmq_conf_set(conf, "client.id", "c-001");
729

730
  // tmq_conf_set(conf, "enable.auto.commit", "true");
731
  // tmq_conf_set(conf, "enable.auto.commit", "false");
732

733
  // tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
734

735
  // tmq_conf_set(conf, "auto.offset.reset", "none");
736
  // tmq_conf_set(conf, "auto.offset.reset", "earliest");
737
  // tmq_conf_set(conf, "auto.offset.reset", "latest");
738
  //
739
  if (g_stConfInfo.useSnapshot) {
70,041✔
740
    tmq_conf_set(conf, "experimental.snapshot.enable", "true");
22,997✔
741
  }
742

743
  pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
70,041✔
744
  tmq_conf_destroy(conf);
70,041✔
745
  return;
70,041✔
746
}
747

748
void build_topic_list(SThreadInfo* pInfo) {
70,041✔
749
  pInfo->topicList = tmq_list_new();
70,041✔
750
  // tmq_list_append(topic_list, "test_stb_topic_1");
751
  for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
144,144✔
752
    tmq_list_append(pInfo->topicList, pInfo->topics[i]);
74,157✔
753
  }
754
  return;
69,987✔
755
}
756

757
int32_t saveConsumeResult(SThreadInfo* pInfo) {
70,041✔
758
  char sqlStr[1024] = {0};
70,041✔
759
  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
760
  sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
70,041✔
761
          g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), pInfo->consumerId, pInfo->consumeMsgCnt,
762
          pInfo->consumeRowCnt, pInfo->checkresult);
763

764
  char tmpString[128];
64,331✔
765
  taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
70,041✔
766

767
  int retCode = queryDB(pInfo->taos, sqlStr);
70,041✔
768
  if (retCode != 0) {
70,041✔
769
    taosFprintfFile(g_fp, "consume id %d error in save consume result\n", pInfo->consumerId);
×
770
    return -1;
×
771
  }
772

773
  return 0;
70,041✔
774
}
775

776
void loop_consume(SThreadInfo* pInfo) {
70,041✔
777
  int32_t code;
778

779
  int32_t once_flag = 0;
70,041✔
780

781
  int64_t totalMsgs = 0;
70,041✔
782
  int64_t totalRows = 0;
70,041✔
783

784
  char tmpString[128];
64,331✔
785
  taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
70,041✔
786
                  pInfo->consumerId);
787

788
  pInfo->ts = taosGetTimestampMs();
70,041✔
789

790
  if (pInfo->ifCheckData) {
70,041✔
791
    char filename[256] = {0};
42,990✔
792
    memset(tmpString, 0, tListLen(tmpString));
42,990✔
793

794
    // sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId,
795
    // getCurrentTimeString(tmpString));
796
    sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
42,990✔
797
    pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
42,990✔
798

799
    sprintf(filename, "%s/../log/meta_consumerid_%d.txt", configDir, pInfo->consumerId);
42,990✔
800
    pInfo->pConsumeMetaFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
42,990✔
801

802
    if (pInfo->pConsumeRowsFile == NULL || pInfo->pConsumeMetaFile == NULL) {
42,990✔
803
      taosFprintfFile(g_fp, "%s create file fail for save rows or save meta\n", getCurrentTimeString(tmpString));
×
804
      return;
×
805
    }
806
  }
807

808
  int64_t  lastTotalMsgs = 0;
70,041✔
809
  uint64_t lastPrintTime = taosGetTimestampMs();
70,041✔
810
  uint64_t startTs = taosGetTimestampMs();
70,041✔
811

812
  int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
70,041✔
813
  while (running) {
14,530,829✔
814
    TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
14,530,829✔
815
    if (tmqMsg) {
14,531,489✔
816
      if (0 != g_stConfInfo.showMsgFlag) {
14,498,832✔
817
        tmq_res_t msgType = tmq_get_res_type(tmqMsg);
14,498,832✔
818
        if (msgType == TMQ_RES_TABLE_META) {
14,498,832✔
819
          totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs);
×
820
        } else if (msgType == TMQ_RES_DATA) {
14,498,832✔
821
          totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
14,498,832✔
822
        } else if (msgType == TMQ_RES_METADATA) {
×
823
          meta_msg_process(tmqMsg, pInfo, totalMsgs);
×
824
          totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
×
825
        }
826
      }
827

828
      taos_free_result(tmqMsg);
14,498,832✔
829
      totalMsgs++;
14,497,512✔
830

831
      int64_t currentPrintTime = taosGetTimestampMs();
14,498,502✔
832
      if (currentPrintTime - lastPrintTime > 10 * 1000) {
14,498,502✔
833
        taosFprintfFile(
87,057✔
834
            g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n",
835
            pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / (currentPrintTime - lastPrintTime));
87,057✔
836
        lastPrintTime = currentPrintTime;
87,057✔
837
        lastTotalMsgs = totalMsgs;
87,057✔
838
      }
839

840
      if (0 == once_flag) {
14,498,502✔
841
        once_flag = 1;
64,822✔
842
        notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
64,822✔
843
      }
844

845
      if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) {
14,498,502✔
846
        memset(tmpString, 0, tListLen(tmpString));
37,384✔
847
        taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
37,384✔
848
        break;
37,384✔
849
      }
850
    } else {
851
      memset(tmpString, 0, tListLen(tmpString));
32,657✔
852
      taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
32,657✔
853
      break;
32,657✔
854
    }
855
  }
856

857
  if (0 == running) {
70,041✔
858
    taosFprintfFile(g_fp, "receive stop signal and not continue consume\n");
×
859
  }
860

861
  pInfo->consumeMsgCnt = totalMsgs;
70,041✔
862
  pInfo->consumeRowCnt = totalRows;
70,041✔
863

864
  taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
70,041✔
865
                  pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
866

867
  if(taosFsyncFile(pInfo->pConsumeRowsFile) < 0){
70,041✔
868
    printf("taosFsyncFile error:%s", strerror(errno));
×
869
  }
870
  taosCloseFile(&pInfo->pConsumeRowsFile);
70,041✔
871
}
872

873
void* consumeThreadFunc(void* param) {
70,041✔
874
  SThreadInfo* pInfo = (SThreadInfo*)param;
70,041✔
875

876
  pInfo->taos = createNewTaosConnect();
70,041✔
877
  if (pInfo->taos == NULL) {
70,041✔
878
    taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
×
879
    return NULL;
×
880
  }
881

882
  build_consumer(pInfo);
70,041✔
883
  build_topic_list(pInfo);
70,041✔
884
  if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
70,041✔
UNCOV
885
    taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
×
886
    taos_close(pInfo->taos);
×
887
    pInfo->taos = NULL;
×
888
    return NULL;
×
889
  }
890

891
  int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
70,041✔
892
  if (err != 0) {
70,041✔
893
    pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
×
894
    taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
×
895
    taos_close(pInfo->taos);
×
896
    pInfo->taos = NULL;
×
897
    return NULL;
×
898
  }
899

900
  tmq_list_destroy(pInfo->topicList);
70,041✔
901
  pInfo->topicList = NULL;
70,041✔
902

903
  loop_consume(pInfo);
70,041✔
904

905
  if (pInfo->ifManualCommit) {
70,041✔
906
    pPrint("tmq_commit() manual commit when consume end.\n");
56,717✔
907
    /*tmq_commit(pInfo->tmq, NULL, 0);*/
908
    tmq_commit_sync(pInfo->tmq, NULL);
56,717✔
909
    tmq_commit_cb_print(pInfo->tmq, 0, pInfo);
56,717✔
910
    taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n");
56,717✔
911
    pPrint("tmq_commit() manual commit over.\n");
56,717✔
912
  }
913

914
  err = tmq_unsubscribe(pInfo->tmq);
70,041✔
915
  if (err != 0) {
70,041✔
916
    pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
×
917
    taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
×
918
  }
919

920
  err = tmq_consumer_close(pInfo->tmq);
70,041✔
921
  if (err != 0) {
70,041✔
922
    pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
×
923
    taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
×
924
  }
925
  pInfo->tmq = NULL;
70,041✔
926

927
  // save consume result into consumeresult table
928
  saveConsumeResult(pInfo);
70,041✔
929

930
  // save rows from per vgroup
931
  taosFprintfFile(g_fp, "======== consumerId: %d, consume rows from per vgroups ========\n", pInfo->consumerId);
70,041✔
932
  for (int32_t i = 0; i < pInfo->numOfVgroups; i++) {
230,236✔
933
    taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]);
160,195✔
934
  }
935

936
  taos_close(pInfo->taos);
70,041✔
937
  pInfo->taos = NULL;
70,041✔
938

939
  return NULL;
70,041✔
940
}
941

942
void parseConsumeInfo() {
64,262✔
943
  char*      token;
944
  const char delim[2] = ",";
64,262✔
945
  const char ch = ':';
64,262✔
946

947
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
134,303✔
948
    token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
70,041✔
949
    while (token != NULL) {
144,198✔
950
      // printf("%s\n", token );
951
      tstrncpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token,
74,157✔
952
               sizeof(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]));
953
      ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
74,157✔
954
      // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
955
      g_stConfInfo.stThreads[i].numOfTopic++;
74,157✔
956

957
      token = strtok(NULL, delim);
74,157✔
958
    }
959

960
    token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
70,041✔
961
    while (token != NULL) {
350,205✔
962
      // printf("%s\n", token );
963
      {
964
        char* pstr = token;
280,164✔
965
        ltrim(pstr);
280,164✔
966
        char* ret = strchr(pstr, ch);
280,164✔
967
        memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
280,164✔
968
        tstrncpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1,
280,164✔
969
                 sizeof(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey]));
970
        // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
971
        // g_stConfInfo.value[g_stConfInfo.numOfKey]);
972
        g_stConfInfo.stThreads[i].numOfKey++;
280,164✔
973
      }
974

975
      token = strtok(NULL, delim);
280,164✔
976
    }
977
  }
978
}
64,262✔
979

980
int32_t getConsumeInfo() {
64,262✔
981
  char sqlStr[1024] = {0};
64,262✔
982

983
  TAOS* pConn = createNewTaosConnect();
64,262✔
984
  if (pConn == NULL) {
64,262✔
985
    taosFprintfFile(g_fp, "taos_connect() fail, can not get consume info for start consumer\n");
×
986
    return -1;
×
987
  }
988

989
  sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
64,262✔
990
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
64,262✔
991
  if (taos_errno(pRes) != 0) {
64,262✔
992
    taosFprintfFile(g_fp, "error in get consumeinfo for %s\n", taos_errstr(pRes));
×
993
    taosCloseFile(&g_fp);
×
994
    taos_free_result(pRes);
×
995
    taos_close(pConn);
×
996
    return -1;
×
997
  }
998

999
  TAOS_ROW    row = NULL;
64,262✔
1000
  int         num_fields = taos_num_fields(pRes);
64,262✔
1001
  TAOS_FIELD* fields = taos_fetch_fields(pRes);
64,262✔
1002

1003
  // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
1004
  // ifcheckdata int
1005

1006
  int32_t numOfThread = 0;
64,262✔
1007
  while ((row = taos_fetch_row(pRes))) {
134,303✔
1008
    int32_t* lengths = taos_fetch_lengths(pRes);
70,041✔
1009

1010
    // set default value
1011
    // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
1012
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
1013
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
1014

1015
    for (int i = 0; i < num_fields; ++i) {
559,338✔
1016
      if (row[i] == NULL || 0 == i) {
489,297✔
1017
        continue;
70,041✔
1018
      }
1019

1020
      if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
419,256✔
1021
        g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
70,041✔
1022
      } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
349,215✔
1023
        memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
70,041✔
1024
      } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
279,174✔
1025
        memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
70,041✔
1026
      } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
209,133✔
1027
        g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
70,041✔
1028
      } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
139,092✔
1029
        g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
70,041✔
1030
      } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
69,051✔
1031
        g_stConfInfo.stThreads[numOfThread].ifManualCommit = *((int32_t*)row[i]);
69,051✔
1032
      }
1033
    }
1034
    numOfThread++;
70,041✔
1035
  }
1036
  g_stConfInfo.numOfThread = numOfThread;
64,262✔
1037

1038
  taos_free_result(pRes);
64,262✔
1039

1040
  parseConsumeInfo();
64,262✔
1041
  taos_close(pConn);
64,262✔
1042

1043
  return 0;
64,262✔
1044
}
1045

1046
static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex, int64_t* lenOfRows) {
×
1047
  char    buf[16 * 1024];
×
1048
  int32_t totalRows = 0;
×
1049
  int32_t totalLen = 0;
×
1050

1051
  // printf("topic: %s\n", tmq_get_topic_name(msg));
1052
  // int32_t     vgroupId = tmq_get_vgroup_id(msg);
1053
  // const char* dbName = tmq_get_db_name(msg);
1054

1055
  // taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
1056
  // taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
1057
  //                 tmq_get_topic_name(msg), vgroupId);
1058

1059
  while (1) {
×
1060
    TAOS_ROW row = taos_fetch_row(msg);
×
1061

1062
    if (row == NULL) break;
×
1063

1064
    TAOS_FIELD* fields = taos_fetch_fields(msg);
×
1065
    int32_t     numOfFields = taos_field_count(msg);
×
1066
    // int32_t*    length = taos_fetch_lengths(msg);
1067
    // int32_t     precision = taos_result_precision(msg);
1068
    // const char* tbName = tmq_get_table_name(msg);
1069

1070
    taos_print_row(buf, row, fields, numOfFields);
×
1071
    totalLen += strlen(buf);
×
1072
    totalRows++;
×
1073
  }
1074

1075
  *lenOfRows = totalLen;
×
1076
  return totalRows;
×
1077
}
1078

1079
void omb_loop_consume(SThreadInfo* pInfo) {
×
1080
  int32_t code;
1081

1082
  int32_t once_flag = 0;
×
1083

1084
  int64_t totalMsgs = 0;
×
1085
  int64_t totalRows = 0;
×
1086

1087
  char tmpString[128];
×
1088
  taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
×
1089
                  pInfo->consumerId);
1090
  printf("%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
×
1091

1092
  pInfo->ts = taosGetTimestampMs();
×
1093

1094
  int64_t  lastTotalMsgs = 0;
×
1095
  uint64_t lastPrintTime = taosGetTimestampMs();
×
1096
  uint64_t startTs = taosGetTimestampMs();
×
1097

1098
  int64_t totalLenOfMsg = 0;
×
1099
  int64_t lastTotalLenOfMsg = 0;
×
1100
  int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
×
1101
  while (running) {
×
1102
    TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
×
1103
    if (tmqMsg) {
×
1104
      int64_t lenOfMsg = 0;
×
1105
      totalRows += omb_data_msg_process(tmqMsg, pInfo, totalMsgs, &lenOfMsg);
×
1106
      totalLenOfMsg += lenOfMsg;
×
1107
      taos_free_result(tmqMsg);
×
1108
      totalMsgs++;
×
1109
      int64_t currentPrintTime = taosGetTimestampMs();
×
1110
      if (currentPrintTime - lastPrintTime > 10 * 1000) {
×
1111
        int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
×
1112
        int64_t deltaTime = currentPrintTime - lastPrintTime;
×
1113
        printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64
×
1114
               ", rate: %.3f msgs/s, %.1f MB/s\n",
1115
               pInfo->consumerId, totalRows, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
×
1116
               currentLenOfMsg * 1000.0 / (1024 * 1024) / deltaTime);
×
1117

1118
        taosFprintfFile(g_fp,
×
1119
                        "consumer id %d has currently poll total msgs: %" PRId64
1120
                        ", period cons rate: %.3f msgs/s, %.1f MB/s\n",
1121
                        pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
×
1122
                        currentLenOfMsg * 1000.0 / deltaTime);
×
1123
        lastPrintTime = currentPrintTime;
×
1124
        lastTotalMsgs = totalMsgs;
×
1125
        lastTotalLenOfMsg = totalLenOfMsg;
×
1126
      }
1127
    } else {
1128
      memset(tmpString, 0, tListLen(tmpString));
×
1129
      taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
×
1130
      printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
×
1131
      int64_t currentPrintTime = taosGetTimestampMs();
×
1132
      int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
×
1133
      int64_t deltaTime = currentPrintTime - lastPrintTime;
×
1134
      printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64
×
1135
             ", rate: %.3f msgs/s, %.1f MB/s\n",
1136
             pInfo->consumerId, totalRows, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
×
1137
             currentLenOfMsg * 1000.0 / (1024 * 1024) / deltaTime);
×
1138
      break;
×
1139
    }
1140
  }
1141

1142
  pInfo->consumeMsgCnt = totalMsgs;
×
1143
  pInfo->consumeRowCnt = totalRows;
×
1144
  pInfo->consumeLen = totalLenOfMsg;
×
1145
}
×
1146

1147
void* ombConsumeThreadFunc(void* param) {
×
1148
  SThreadInfo* pInfo = (SThreadInfo*)param;
×
1149

1150
  //################### set key ########################
1151
  tmq_conf_t* conf = tmq_conf_new();
×
1152
  // tmq_conf_set(conf, "td.connect.ip", "localhost");
1153
  // tmq_conf_set(conf, "td.connect.port", "6030");
1154
  tmq_conf_set(conf, "td.connect.user", "root");
×
1155
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
×
1156
  // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
1157
  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
×
1158
  tmq_conf_set(conf, "group.id", "ombCgrp");
×
1159
  // tmq_conf_set(conf, "msg.with.table.name", "true");
1160
  // tmq_conf_set(conf, "client.id", "c-001");
1161
  // tmq_conf_set(conf, "enable.auto.commit", "true");
1162
  tmq_conf_set(conf, "enable.auto.commit", "false");
×
1163
  // tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
1164
  // tmq_conf_set(conf, "auto.offset.reset", "none");
1165
  // tmq_conf_set(conf, "auto.offset.reset", "earliest");
1166
  tmq_conf_set(conf, "auto.offset.reset", "earliest");
×
1167
  //
1168
  if (g_stConfInfo.useSnapshot) {
×
1169
    tmq_conf_set(conf, "experimental.snapshot.enable", "true");
×
1170
  }
1171

1172
  pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
×
1173

1174
  tmq_conf_destroy(conf);
×
1175

1176
  //################### set topic ##########################
1177
  pInfo->topicList = tmq_list_new();
×
1178
  tmq_list_append(pInfo->topicList, g_stConfInfo.topic);
×
1179

1180
  if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
×
1181
    taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
×
1182
    return NULL;
×
1183
  }
1184

1185
  int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
×
1186
  if (err != 0) {
×
1187
    pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
×
1188
    taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
×
1189
    return NULL;
×
1190
  }
1191

1192
  tmq_list_destroy(pInfo->topicList);
×
1193
  pInfo->topicList = NULL;
×
1194

1195
  omb_loop_consume(pInfo);
×
1196

1197
  err = tmq_unsubscribe(pInfo->tmq);
×
1198
  if (err != 0) {
×
1199
    pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
×
1200
    taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
×
1201
  }
1202

1203
  err = tmq_consumer_close(pInfo->tmq);
×
1204
  if (err != 0) {
×
1205
    pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
×
1206
    taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
×
1207
  }
1208
  pInfo->tmq = NULL;
×
1209

1210
  return NULL;
×
1211
}
1212

1213
static int queryDbExec(TAOS* taos, char* command, QUERY_TYPE type) {
×
1214
  TAOS_RES* res = taos_query(taos, command);
×
1215
  int32_t   code = taos_errno(res);
×
1216

1217
  if (code != 0) {
×
1218
    pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC);
×
1219
    taos_free_result(res);
×
1220
    return -1;
×
1221
  }
1222

1223
  if (INSERT_TYPE == type) {
×
1224
    int affectedRows = taos_affected_rows(res);
×
1225
    taos_free_result(res);
×
1226
    return affectedRows;
×
1227
  }
1228

1229
  taos_free_result(res);
×
1230
  return 0;
×
1231
}
1232

1233
void* ombProduceThreadFunc(void* param) {
×
1234
  SThreadInfo* pInfo = (SThreadInfo*)param;
×
1235

1236
  pInfo->taos = createNewTaosConnect();
×
1237
  if (pInfo->taos == NULL) {
×
1238
    taosFprintfFile(g_fp, "taos_connect() fail, can not start producers!\n");
×
1239
    return NULL;
×
1240
  }
1241

1242
  int64_t affectedRowsTotal = 0;
×
1243
  int64_t sendMsgs = 0;
×
1244

1245
  uint32_t totalSendLoopTimes =
×
1246
      g_stConfInfo.runDurationMinutes * 60 * 1000 / SEND_TIME_UNIT;  // send some msgs per 10ms
×
1247
  uint32_t batchPerTblTimes = pInfo->producerRate / 100 / g_stConfInfo.batchSize;
×
1248
  uint32_t remainder = (pInfo->producerRate / 100) % g_stConfInfo.batchSize;
×
1249
  if (remainder) {
×
1250
    batchPerTblTimes += 1;
×
1251
  }
1252

1253
  char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN);
×
1254
  if (NULL == sqlBuf) {
×
1255
    printf("malloc fail for sqlBuf\n");
×
1256
    taos_close(pInfo->taos);
×
1257
    pInfo->taos = NULL;
×
1258
    return NULL;
×
1259
  }
1260

1261
  printf("Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d\n", totalSendLoopTimes,
×
1262
         batchPerTblTimes, pInfo->producerRate);
1263

1264
  char ctbName[128] = {0};
×
1265
  sprintf(ctbName, "%s.ctb%d", g_stConfInfo.dbName, pInfo->consumerId);
×
1266

1267
  int64_t lastPrintTime = taosGetTimestampUs();
×
1268
  int64_t totalMsgLen = 0;
×
1269
  // int64_t timeStamp = taosGetTimestampUs();
1270
  while (totalSendLoopTimes) {
×
1271
    int64_t startTs = taosGetTimestampUs();
×
1272
    for (int i = 0; i < batchPerTblTimes; ++i) {
×
1273
      uint32_t msgsOfSql = g_stConfInfo.batchSize;
×
1274
      if ((i == batchPerTblTimes - 1) && (0 != remainder)) {
×
1275
        msgsOfSql = remainder;
×
1276
      }
1277
      int len = 0;
×
1278
      len += tsnprintf(sqlBuf + len, MAX_SQL_LEN - len, "insert into %s values ", ctbName);
×
1279
      for (int j = 0; j < msgsOfSql; j++) {
×
1280
        int64_t timeStamp = taosGetTimestampNs();
×
1281
        len += tsnprintf(sqlBuf + len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload);
×
1282
        sendMsgs++;
×
1283
        pInfo->totalProduceMsgs++;
×
1284
      }
1285

1286
      totalMsgLen += len;
×
1287
      pInfo->totalMsgsLen += len;
×
1288

1289
      int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE);
×
1290
      if (affectedRows < 0) {
×
1291
        taos_close(pInfo->taos);
×
1292
        pInfo->taos = NULL;
×
1293
        taosMemoryFree(sqlBuf);
×
1294
        return NULL;
×
1295
      }
1296

1297
      affectedRowsTotal += affectedRows;
×
1298

1299
      // printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows);
1300
    }
1301
    totalSendLoopTimes -= 1;
×
1302

1303
    // calc spent time
1304
    int64_t currentTs = taosGetTimestampUs();
×
1305
    int64_t delta = currentTs - startTs;
×
1306
    if (delta < SEND_TIME_UNIT * 1000) {
×
1307
      int64_t sleepLen = (int32_t)(SEND_TIME_UNIT * 1000 - delta);
×
1308
      // printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta);
1309
      taosUsleep((int32_t)sleepLen);
×
1310
    }
1311

1312
    currentTs = taosGetTimestampUs();
×
1313
    delta = currentTs - lastPrintTime;
×
1314
    if (delta > 10 * 1000 * 1000) {
×
1315
      printf("producer[%d] info: %" PRId64 " msgs, %" PRId64 " Byte, %" PRId64 " us, totalSendLoopTimes: %d\n",
×
1316
             pInfo->consumerId, sendMsgs, totalMsgLen, delta, totalSendLoopTimes);
1317
      printf("producer[%d] rate: %1.f msgs/s, %1.f KB/s\n", pInfo->consumerId, sendMsgs * 1000.0 * 1000 / delta,
×
1318
             (totalMsgLen / 1024.0) / (delta / (1000 * 1000)));
×
1319
      lastPrintTime = currentTs;
×
1320
      sendMsgs = 0;
×
1321
      totalMsgLen = 0;
×
1322
    }
1323
  }
1324

1325
  printf("affectedRowsTotal: %" PRId64 "\n", affectedRowsTotal);
×
1326
  taos_close(pInfo->taos);
×
1327
  pInfo->taos = NULL;
×
1328
  taosMemoryFree(sqlBuf);
×
1329
  return NULL;
×
1330
}
1331

1332
void printProduceInfo(int64_t start) {
×
1333
  int64_t totalMsgs = 0;
×
1334
  int64_t totalLenOfMsgs = 0;
×
1335
  for (int i = 0; i < g_stConfInfo.producers; i++) {
×
1336
    totalMsgs += g_stConfInfo.stProdThreads[i].totalProduceMsgs;
×
1337
    totalLenOfMsgs += g_stConfInfo.stProdThreads[i].totalMsgsLen;
×
1338
  }
1339

1340
  int64_t end = taosGetTimestampUs();
×
1341

1342
  int64_t t = end - start;
×
1343
  if (0 == t) t = 1;
×
1344

1345
  double tInMs = (double)t / 1000000.0;
×
1346
  printf("Spent %.3f seconds to prod %" PRIu64 " msgs, %" PRIu64 " Byte\n\n", tInMs, totalMsgs, totalLenOfMsgs);
×
1347

1348
  printf("Spent %.3f seconds to prod %" PRIu64 " msgs with %d producer(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
×
1349
         tInMs, totalMsgs, g_stConfInfo.producers, (double)totalMsgs / tInMs,
×
1350
         (double)totalLenOfMsgs / (1024.0 * 1024) / tInMs);
×
1351
  return;
×
1352
}
1353

1354
void startOmbConsume() {
×
1355
  TdThreadAttr thattr;
×
1356
  taosThreadAttrInit(&thattr);
×
1357
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
×
1358

1359
  if (0 != g_stConfInfo.producers) {
×
1360
    TAOS* taos = createNewTaosConnect();
×
1361
    if (taos == NULL) {
×
1362
      taosFprintfFile(g_fp, "taos_connect() fail, can not create db, stbl, ctbl, topic!\n");
×
1363
      return;
×
1364
    }
1365

1366
    char stbName[16] = "stb";
×
1367
    char ctbPrefix[16] = "ctb";
×
1368

1369
    char sql[256] = {0};
×
1370
    sprintf(sql, "drop database if exists %s", g_stConfInfo.dbName);
×
1371
    printf("SQL: %s\n", sql);
×
1372
    queryDbExec(taos, sql, NO_INSERT_TYPE);
×
1373

1374
    sprintf(sql, "create database if not exists %s precision 'ns' vgroups %d", g_stConfInfo.dbName,
×
1375
            g_stConfInfo.producers);
1376
    printf("SQL: %s\n", sql);
×
1377
    queryDbExec(taos, sql, NO_INSERT_TYPE);
×
1378

1379
    sprintf(sql, "create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) ", g_stConfInfo.dbName,
×
1380
            stbName, g_stConfInfo.payloadLen);
1381
    printf("SQL: %s\n", sql);
×
1382
    queryDbExec(taos, sql, NO_INSERT_TYPE);
×
1383

1384
    for (int i = 0; i < g_stConfInfo.producers; i++) {
×
1385
      sprintf(sql, "create table %s.%s%d using %s.stb tags(%d) ", g_stConfInfo.dbName, ctbPrefix, i,
×
1386
              g_stConfInfo.dbName, i);
1387
      printf("SQL: %s\n", sql);
×
1388
      queryDbExec(taos, sql, NO_INSERT_TYPE);
×
1389
    }
1390

1391
    // create topic
1392
    sprintf(sql, "create topic %s as stable %s.%s", g_stConfInfo.topic, g_stConfInfo.dbName, stbName);
×
1393
    printf("SQL: %s\n", sql);
×
1394
    queryDbExec(taos, sql, NO_INSERT_TYPE);
×
1395

1396
    int32_t producerRate = ceil(((double)g_stConfInfo.producerRate) / g_stConfInfo.producers);
×
1397

1398
    printf("==== create %d produce thread ====\n", g_stConfInfo.producers);
×
1399
    for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {
×
1400
      g_stConfInfo.stProdThreads[i].consumerId = i;
×
1401
      g_stConfInfo.stProdThreads[i].producerRate = producerRate;
×
1402
      taosThreadCreate(&(g_stConfInfo.stProdThreads[i].thread), &thattr, ombProduceThreadFunc,
×
1403
                       (void*)(&(g_stConfInfo.stProdThreads[i])));
×
1404
    }
1405

1406
    if (0 == g_stConfInfo.numOfThread) {
×
1407
      int64_t start = taosGetTimestampUs();
×
1408
      for (int32_t i = 0; i < g_stConfInfo.producers; i++) {
×
1409
        taosThreadJoin(g_stConfInfo.stProdThreads[i].thread, NULL);
×
1410
        taosThreadClear(&g_stConfInfo.stProdThreads[i].thread);
×
1411
      }
1412

1413
      printProduceInfo(start);
×
1414

1415
      taosFprintfFile(g_fp, "==== close tmqlog ====\n");
×
1416
      taosCloseFile(&g_fp);
×
1417
      taos_close(taos);
×
1418
      return;
×
1419
    }
1420

1421
    taos_close(taos);
×
1422
  }
1423

1424
  // pthread_create one thread to consume
1425
  taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
×
1426
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
×
1427
    g_stConfInfo.stThreads[i].consumerId = i;
×
1428
    taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, ombConsumeThreadFunc,
×
1429
                     (void*)(&(g_stConfInfo.stThreads[i])));
×
1430
  }
1431

1432
  int64_t start = taosGetTimestampUs();
×
1433

1434
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
×
1435
    taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
×
1436
    taosThreadClear(&g_stConfInfo.stThreads[i].thread);
×
1437
  }
1438

1439
  int64_t end = taosGetTimestampUs();
×
1440

1441
  int64_t totalRows = 0;
×
1442
  int64_t totalMsgs = 0;
×
1443
  int64_t totalLenOfMsgs = 0;
×
1444
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
×
1445
    totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
×
1446
    totalLenOfMsgs += g_stConfInfo.stThreads[i].consumeLen;
×
1447
    totalRows += g_stConfInfo.stThreads[i].consumeRowCnt;
×
1448
  }
1449

1450
  int64_t t = end - start;
×
1451
  if (0 == t) t = 1;
×
1452

1453
  double tInMs = (double)t / 1000000.0;
×
1454
  taosFprintfFile(
×
1455
      g_fp, "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
1456
      tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs),
×
1457
      (double)totalLenOfMsgs / (1024 * 1024) / tInMs);
×
1458

1459
  printf("Spent %.3f seconds to cons rows: %" PRIu64 " msgs: %" PRIu64
×
1460
         " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
1461
         tInMs, totalRows, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs),
×
1462
         (double)totalLenOfMsgs / (1024 * 1024) / tInMs);
×
1463

1464
  taosFprintfFile(g_fp, "==== close tmqlog ====\n");
×
1465
  taosCloseFile(&g_fp);
×
1466

1467
  return;
×
1468
}
1469

1470
int main(int32_t argc, char* argv[]) {
64,262✔
1471
  parseArgument(argc, argv);
64,262✔
1472

1473
  if (0 != strlen(g_stConfInfo.topic)) {
64,262✔
1474
    startOmbConsume();
×
1475
    return 0;
×
1476
  }
1477

1478
  int32_t retCode = getConsumeInfo();
64,262✔
1479
  if (0 != retCode) {
64,262✔
1480
    return -1;
×
1481
  }
1482

1483
  saveConfigToLogFile();
64,262✔
1484

1485
  tmqSetSignalHandle();
64,262✔
1486

1487
  TdThreadAttr thattr;
60,173✔
1488
  taosThreadAttrInit(&thattr);
64,262✔
1489
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
64,262✔
1490

1491
  // pthread_create one thread to consume
1492
  taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
64,262✔
1493
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
134,303✔
1494
    taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
70,041✔
1495
                     (void*)(&(g_stConfInfo.stThreads[i])));
70,041✔
1496
  }
1497

1498
  int64_t start = taosGetTimestampUs();
64,262✔
1499

1500
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
134,303✔
1501
    taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
70,041✔
1502
    taosThreadClear(&g_stConfInfo.stThreads[i].thread);
70,041✔
1503
  }
1504

1505
  int64_t end = taosGetTimestampUs();
64,262✔
1506

1507
  int64_t totalMsgs = 0;
64,262✔
1508
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
134,303✔
1509
    totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
70,041✔
1510
  }
1511

1512
  int64_t t = end - start;
64,262✔
1513
  if (0 == t) t = 1;
64,262✔
1514

1515
  double tInMs = (double)t / 1000000.0;
64,262✔
1516
  taosFprintfFile(g_fp,
64,262✔
1517
                  "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/second\n\n",
1518
                  tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
60,173✔
1519

1520
  taosFprintfFile(g_fp, "==== close tmqlog ====\n");
64,262✔
1521
  taosCloseFile(&g_fp);
64,262✔
1522

1523
  return 0;
64,262✔
1524
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc