• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 13135713772

04 Feb 2025 12:42PM UTC coverage: 75.074% (-0.03%) from 75.104%
13135713772

Pull #843

github

web-flow
Merge 7e4333581 into 85e3589c2
Pull Request #843: taos-tools Merge 3.0 to Main Branch

433 of 593 new or added lines in 6 files covered. (73.02%)

41 existing lines in 6 files now uncovered.

12457 of 16593 relevant lines covered (75.07%)

327996.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.34
/src/benchInsert.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15
#include "wrapDb.h"
16
#include <benchData.h>
17
#include <benchInsertMix.h>
18

19
static int32_t stmt2BindAndSubmit(
20
        threadInfo *pThreadInfo,
21
        SChildTable *childTbl,
22
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
23
        int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w);
24
TAOS_STMT2* initStmt2(TAOS* taos, bool single);        
25

26
#define FREE_PIDS_INFOS_RETURN_MINUS_1()            \
27
    do {                                            \
28
        tmfree(pids);                               \
29
        tmfree(infos);                              \
30
        return -1;                                  \
31
    } while (0)
32

33
#define FREE_RESOURCE()                             \
34
    do {                                            \
35
        if (pThreadInfo->conn)                      \
36
            closeBenchConn(pThreadInfo->conn);      \
37
        benchArrayDestroy(pThreadInfo->delayList);  \
38
        tmfree(pids);                               \
39
        tmfree(infos);                              \
40
    } while (0)                                     \
41

42
static int getSuperTableFromServerRest(
3✔
43
    SDataBase* database, SSuperTable* stbInfo, char *command) {
44

45
    // TODO(zero): it will create super table based on this error code.
46
    return TSDB_CODE_NOT_FOUND;
3✔
47
    // TODO(me): finish full implementation
48
#if 0
49
    int sockfd = createSockFd();
50
    if (sockfd < 0) {
51
        return -1;
52
    }
53

54
    int code = postProceSql(command,
55
                         database->dbName,
56
                         database->precision,
57
                         REST_IFACE,
58
                         0,
59
                         g_arguments->port,
60
                         false,
61
                         sockfd,
62
                         NULL);
63

64
    destroySockFd(sockfd);
65
#endif   // 0
66
}
67

68
static int getSuperTableFromServerTaosc(
142✔
69
        SDataBase *database, SSuperTable *stbInfo, char *command) {
70
    TAOS_RES *res;
71
    TAOS_ROW row = NULL;
142✔
72
    SBenchConn *conn = initBenchConn();
142✔
73
    if (NULL == conn) {
142✔
74
        return TSDB_CODE_FAILED;
×
75
    }
76

77
    res = taos_query(conn->taos, command);
142✔
78
    int32_t code = taos_errno(res);
142✔
79
    if (code != 0) {
142✔
80
        infoPrint("stable %s does not exist, will create one\n",
140✔
81
                  stbInfo->stbName);
82
        closeBenchConn(conn);
140✔
83
        return TSDB_CODE_NOT_FOUND;
140✔
84
    }
85
    infoPrint("find stable<%s>, will get meta data from server\n",
2✔
86
              stbInfo->stbName);
87

88
    // Check if the the existing super table matches exactly with the definitions in the JSON file.
89
    // If a hash table were used, the time complexity would be only O(n).
90
    // But taosBenchmark does not incorporate a hash table, the time complexity of the loop traversal is O(n^2).
91
    bool isTitleRow = true;
2✔
92
    uint32_t tag_count = 0;
2✔
93
    uint32_t col_count = 0;
2✔
94
    while ((row = taos_fetch_row(res)) != NULL) {
11✔
95
        if (isTitleRow) {
9✔
96
            isTitleRow = false;
2✔
97
            continue;
2✔
98
        }
99
        int32_t *lengths = taos_fetch_lengths(res);
7✔
100
        if (lengths == NULL) {
7✔
101
            errorPrint("%s", "failed to execute taos_fetch_length\n");
×
102
            taos_free_result(res);
×
103
            closeBenchConn(conn);
×
104
            return TSDB_CODE_FAILED;
×
105
        }
106
        if (strncasecmp((char *) row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "tag",
7✔
107
                        strlen("tag")) == 0) {
108
            if (stbInfo->tags == NULL || stbInfo->tags->size == 0 || tag_count >= stbInfo->tags->size) {
3✔
109
                errorPrint("%s", "existing stable tag mismatched with that defined in JSON\n");
×
110
                taos_free_result(res);
×
111
                closeBenchConn(conn);
×
112
                return TSDB_CODE_FAILED;
×
113
            }
114
            uint8_t tagType = convertStringToDatatype(
6✔
115
                    (char *) row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
3✔
116
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX]);
3✔
117
            char *tagName = (char *) row[TSDB_DESCRIBE_METRIC_FIELD_INDEX];
3✔
118
            if (!searchBArray(stbInfo->tags, tagName,
3✔
119
                              lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX], tagType)) {
120
                errorPrint("existing stable tag:%s not found in JSON tags.\n", tagName);
×
121
                taos_free_result(res);
×
122
                closeBenchConn(conn);
×
123
                return TSDB_CODE_FAILED;
×
124
            }
125
            tag_count += 1;
3✔
126
        } else {
127
            if (stbInfo->cols == NULL || stbInfo->cols->size == 0 || col_count >= stbInfo->cols->size) {
4✔
128
                errorPrint("%s", "existing stable column mismatched with that defined in JSON\n");
×
129
                taos_free_result(res);
×
130
                closeBenchConn(conn);
×
131
                return TSDB_CODE_FAILED;
×
132
            }
133
            uint8_t colType = convertStringToDatatype(
8✔
134
                    (char *) row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
4✔
135
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX]);
4✔
136
            char * colName = (char *) row[TSDB_DESCRIBE_METRIC_FIELD_INDEX];
4✔
137
            if (!searchBArray(stbInfo->cols, colName,
4✔
138
                              lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX], colType)) {
139
                errorPrint("existing stable col:%s not found in JSON cols.\n", colName);
×
140
                taos_free_result(res);
×
141
                closeBenchConn(conn);
×
142
                return TSDB_CODE_FAILED;
×
143
            }
144
            col_count += 1;
4✔
145
        }
146
    }  // end while
147
    taos_free_result(res);
2✔
148
    closeBenchConn(conn);
2✔
149
    if (tag_count != stbInfo->tags->size) {
2✔
150
        errorPrint("%s", "existing stable tag mismatched with that defined in JSON\n");
×
151
        return TSDB_CODE_FAILED;
×
152
    }
153

154
    if (col_count != stbInfo->cols->size) {
2✔
155
        errorPrint("%s", "existing stable column mismatched with that defined in JSON\n");
×
156
        return TSDB_CODE_FAILED;
×
157
    }
158

159
    return TSDB_CODE_SUCCESS;
2✔
160
}
161

162

163
static int getSuperTableFromServer(SDataBase* database, SSuperTable* stbInfo) {
148✔
164
#ifdef WEBSOCKET
165
    if (g_arguments->websocket) {
148✔
166
        return 0;
3✔
167
    }
168
#endif
169
    int ret = 0;
145✔
170
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
145✔
171
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
145✔
172
             "DESCRIBE `%s`.`%s`", database->dbName,
173
             stbInfo->stbName);
174

175
    if (REST_IFACE == stbInfo->iface) {
145✔
176
        ret = getSuperTableFromServerRest(database, stbInfo, command);
3✔
177
    } else {
178
        ret = getSuperTableFromServerTaosc(database, stbInfo, command);
142✔
179
    }
180

181
    return ret;
145✔
182
}
183

184
static int queryDbExec(SDataBase *database,
142✔
185
                       SSuperTable *stbInfo, char *command) {
186
    int ret = 0;
142✔
187
    if (isRest(stbInfo->iface)) {
142✔
188
        if (0 != convertServAddr(stbInfo->iface, false, 1)) {
1✔
189
            errorPrint("%s", "Failed to convert server address\n");
×
190
            return -1;
×
191
        }
192
        int sockfd = createSockFd();
1✔
193
        if (sockfd < 0) {
1✔
194
            ret = -1;
×
195
        } else {
196
            ret = queryDbExecRest(command,
1✔
197
                              database->dbName,
198
                              database->precision,
199
                              stbInfo->iface,
1✔
200
                              stbInfo->lineProtocol,
1✔
201
                              stbInfo->tcpTransfer,
1✔
202
                              sockfd);
203
            destroySockFd(sockfd);
1✔
204
        }
205
    } else {
206
        SBenchConn* conn = initBenchConn();
141✔
207
        if (NULL == conn) {
141✔
208
            ret = -1;
×
209
        } else {
210
            ret = queryDbExecCall(conn, command);
141✔
211
            int32_t trying = g_arguments->keep_trying;
141✔
212
            while (ret && trying) {
141✔
213
                infoPrint("will sleep %"PRIu32" milliseconds then re-execute command: %s\n",
×
214
                          g_arguments->trying_interval, command);
215
                toolsMsleep(g_arguments->trying_interval);
×
216
                ret = queryDbExecCall(conn, command);
×
217
                if (trying != -1) {
×
218
                    trying--;
×
219
                }
220
            }
221
            if (0 != ret) {
141✔
222
                ret = -1;
×
223
            }
224
            closeBenchConn(conn);
141✔
225
        }
226
    }
227

228
    return ret;
142✔
229
}
230

231
#ifdef WEBSOCKET
232
static void dropSuperTable(SDataBase* database, SSuperTable* stbInfo) {
3✔
233
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
3✔
234
    snprintf(command, sizeof(command),
3✔
235
        g_arguments->escape_character
3✔
236
            ? "DROP TABLE IF EXISTS `%s`.`%s`"
237
            : "DROP TABLE IF EXISTS %s.%s",
238
             database->dbName,
239
             stbInfo->stbName);
240

241
    infoPrint("drop stable: <%s>\n", command);
3✔
242
    queryDbExec(database, stbInfo, command);
3✔
243

244
    return;
3✔
245
}
246
#endif  // WEBSOCKET
247

248
int getCompressStr(Field* col, char* buf) {
857✔
249
    int pos = 0;
857✔
250
    if(strlen(col->encode) > 0) {
857✔
251
        pos +=sprintf(buf + pos, "encode \'%s\' ", col->encode);
×
252
    }
253
    if(strlen(col->compress) > 0) {
857✔
254
        pos +=sprintf(buf + pos, "compress \'%s\' ", col->compress);
×
255
    }
256
    if(strlen(col->level) > 0) {
857✔
257
        pos +=sprintf(buf + pos, "level \'%s\' ", col->level);
×
258
    }
259

260
    return pos;
857✔
261
}
262

263
static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
148✔
264
    if (g_arguments->supplementInsert) {
148✔
265
        return 0;
1✔
266
    }
267

268
    uint32_t col_buffer_len = (TSDB_COL_NAME_LEN + 15 + COMP_NAME_LEN*3) * stbInfo->cols->size;
147✔
269
    char         *colsBuf = benchCalloc(1, col_buffer_len, false);
147✔
270
    char*         command = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
147✔
271
    int          len = 0;
147✔
272

273
    for (int colIndex = 0; colIndex < stbInfo->cols->size; colIndex++) {
1,004✔
274
        Field * col = benchArrayGet(stbInfo->cols, colIndex);
857✔
275
        int n;
276
        if (col->type == TSDB_DATA_TYPE_BINARY ||
857✔
277
            col->type == TSDB_DATA_TYPE_NCHAR ||
669✔
278
            col->type == TSDB_DATA_TYPE_VARBINARY ||
644✔
279
            col->type == TSDB_DATA_TYPE_GEOMETRY) {
644✔
280
            n = snprintf(colsBuf + len, col_buffer_len - len,
426✔
281
                    ",%s %s(%d)", col->name,
213✔
282
                    convertDatatypeToString(col->type), col->length);
213✔
283
            if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) {
213✔
284
                errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
×
285
                           colIndex);
286
                return -1;
×
287
            }
288
        } else {
289
            n = snprintf(colsBuf + len, col_buffer_len - len,
1,288✔
290
                    ",%s %s", col->name,
644✔
291
                    convertDatatypeToString(col->type));
644✔
292
        }
293

294
        // primary key
295
        if(stbInfo->primary_key && colIndex == 0) {
857✔
296
            len += n;
1✔
297
            n = snprintf(colsBuf + len, col_buffer_len - len, " %s", PRIMARY_KEY);
1✔
298
        }
299

300
        // compress key
301
        char keys[COMP_NAME_LEN*3] = "";
857✔
302
        if (getCompressStr(col, keys) > 0) {
857✔
303
            len += n;
×
304
            n = snprintf(colsBuf + len, col_buffer_len - len, " %s", keys);
×
305
        }
306

307
        if (n < 0 || n >= col_buffer_len - len) {
857✔
308
            errorPrint("%s() LN%d, snprintf overflow on %d\n",
×
309
                       __func__, __LINE__, colIndex);
310
            break;
×
311
        } else {
312
            len += n;
857✔
313
        }
314
    }
315

316
    // save for creating child table
317
    stbInfo->colsOfCreateChildTable =
147✔
318
        (char *)benchCalloc(len + TIMESTAMP_BUFF_LEN, 1, true);
147✔
319

320
    snprintf(stbInfo->colsOfCreateChildTable, len + TIMESTAMP_BUFF_LEN,
147✔
321
             "(ts timestamp%s)", colsBuf);
322

323
    if (stbInfo->tags->size == 0) {
147✔
324
        free(colsBuf);
8✔
325
        free(command);
8✔
326
        return 0;
8✔
327
    }
328

329
    uint32_t tag_buffer_len = (TSDB_COL_NAME_LEN + 15) * stbInfo->tags->size;
139✔
330
    char *tagsBuf = benchCalloc(1, tag_buffer_len, false);
139✔
331
    int  tagIndex;
332
    len = 0;
139✔
333

334
    int n;
335
    n = snprintf(tagsBuf + len, tag_buffer_len - len, "(");
139✔
336
    if (n < 0 || n >= tag_buffer_len - len) {
139✔
337
        errorPrint("%s() LN%d snprintf overflow\n",
×
338
                       __func__, __LINE__);
339
        free(colsBuf);
×
340
        free(command);
×
341
        tmfree(tagsBuf);
×
342
        return -1;
×
343
    } else {
344
        len += n;
139✔
345
    }
346
    for (tagIndex = 0; tagIndex < stbInfo->tags->size; tagIndex++) {
755✔
347
        Field *tag = benchArrayGet(stbInfo->tags, tagIndex);
618✔
348
        if (tag->type == TSDB_DATA_TYPE_BINARY ||
618✔
349
            tag->type == TSDB_DATA_TYPE_NCHAR ||
390✔
350
            tag->type == TSDB_DATA_TYPE_VARBINARY ||
373✔
351
            tag->type == TSDB_DATA_TYPE_GEOMETRY) {
373✔
352
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
490✔
353
                    "%s %s(%d),", tag->name,
245✔
354
                    convertDatatypeToString(tag->type), tag->length);
245✔
355
            if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) {
245✔
356
                errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
×
357
                           tagIndex);
358
                return -1;
×
359
            }
360
        } else if (tag->type == TSDB_DATA_TYPE_JSON) {
373✔
361
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
2✔
362
                    "%s json", tag->name);
2✔
363
            if (n < 0 || n >= tag_buffer_len - len) {
2✔
364
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
365
                       __func__, __LINE__, tagIndex);
366
                break;
×
367
            } else {
368
                len += n;
2✔
369
            }
370
            goto skip;
2✔
371
        } else {
372
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
742✔
373
                    "%s %s,", tag->name,
371✔
374
                    convertDatatypeToString(tag->type));
371✔
375
        }
376

377
        if (n < 0 || n >= tag_buffer_len - len) {
616✔
378
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
379
                       __func__, __LINE__, tagIndex);
380
            break;
×
381
        } else {
382
            len += n;
616✔
383
        }
384
    }
385
    len -= 1;
137✔
386
skip:
139✔
387
    snprintf(tagsBuf + len, tag_buffer_len - len, ")");
139✔
388

389
    int length = snprintf(
139✔
390
        command, TSDB_MAX_ALLOWED_SQL_LEN,
391
        g_arguments->escape_character
139✔
392
            ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` (ts TIMESTAMP%s) TAGS %s"
393
            : "CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP%s) TAGS %s",
394
        database->dbName, stbInfo->stbName, colsBuf, tagsBuf);
395
    tmfree(colsBuf);
139✔
396
    tmfree(tagsBuf);
139✔
397
    if (stbInfo->comment != NULL) {
139✔
398
        length += snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length,
×
399
                           " COMMENT '%s'", stbInfo->comment);
400
    }
401
    if (stbInfo->delay >= 0) {
139✔
402
        length += snprintf(command + length,
×
403
                           TSDB_MAX_ALLOWED_SQL_LEN - length, " DELAY %d",
×
404
                           stbInfo->delay);
405
    }
406
    if (stbInfo->file_factor >= 0) {
139✔
407
        length +=
×
408
            snprintf(command + length,
×
409
                     TSDB_MAX_ALLOWED_SQL_LEN - length, " FILE_FACTOR %f",
×
410
                     (float)stbInfo->file_factor / 100);
×
411
    }
412
    if (stbInfo->rollup != NULL) {
139✔
413
        length += snprintf(command + length,
×
414
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
415
                           " ROLLUP(%s)", stbInfo->rollup);
416
    }
417

418
    if (stbInfo->max_delay != NULL) {
139✔
419
        length += snprintf(command + length,
×
420
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
421
                " MAX_DELAY %s", stbInfo->max_delay);
422
    }
423

424
    if (stbInfo->watermark != NULL) {
139✔
425
        length += snprintf(command + length,
×
426
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
427
                " WATERMARK %s", stbInfo->watermark);
428
    }
429

430
    // not support ttl in super table
431
    /*
432
    if (stbInfo->ttl != 0) {
433
        length += snprintf(command + length,
434
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
435
                " TTL %d", stbInfo->ttl);
436
    }
437
    */
438

439
    bool first_sma = true;
139✔
440
    for (int i = 0; i < stbInfo->cols->size; i++) {
972✔
441
        Field * col = benchArrayGet(stbInfo->cols, i);
833✔
442
        if (col->sma) {
833✔
443
            if (first_sma) {
×
444
                n = snprintf(command + length,
×
445
                                   TSDB_MAX_ALLOWED_SQL_LEN - length,
×
446
                        " SMA(%s", col->name);
×
447
                first_sma = false;
×
448
            } else {
449
                n = snprintf(command + length,
×
450
                                   TSDB_MAX_ALLOWED_SQL_LEN - length,
×
451
                        ",%s", col->name);
×
452
            }
453

454
            if (n < 0 || n > TSDB_MAX_ALLOWED_SQL_LEN - length) {
×
455
                errorPrint("%s() LN%d snprintf overflow on %d iteral\n",
×
456
                           __func__, __LINE__, i);
457
                break;
×
458
            } else {
459
                length += n;
×
460
            }
461
        }
462
    }
463
    if (!first_sma) {
139✔
464
        snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, ")");
×
465
    }
466
    infoPrint("create stable: <%s>\n", command);
139✔
467

468
    int ret = queryDbExec(database, stbInfo, command);
139✔
469
    free(command);
139✔
470
    return ret;
139✔
471
}
472

473

474
int32_t getVgroupsNative(SBenchConn *conn, SDataBase *database) {
3✔
475
    int     vgroups = 0;
3✔
476
    char    cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
3✔
477
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
3✔
478
            g_arguments->escape_character
3✔
479
            ? "SHOW `%s`.VGROUPS"
480
            : "SHOW %s.VGROUPS",
481
            database->dbName);
482

483
    TAOS_RES* res = taos_query(conn->taos, cmd);
3✔
484
    int code = taos_errno(res);
3✔
485
    if (code) {
3✔
486
        printErrCmdCodeStr(cmd, code, res);
×
487
        return -1;
×
488
    }
489

490
    TAOS_ROW row = NULL;
3✔
491
    while ((row = taos_fetch_row(res)) != NULL) {
13✔
492
        vgroups++;
10✔
493
    }
494
    debugPrint("%s() LN%d, vgroups: %d\n", __func__, __LINE__, vgroups);
3✔
495
    taos_free_result(res);
3✔
496

497
    database->vgroups = vgroups;
3✔
498
    database->vgArray = benchArrayInit(vgroups, sizeof(SVGroup));
3✔
499
    for (int32_t v = 0; (v < vgroups
3✔
500
            && !g_arguments->terminate); v++) {
13✔
501
        SVGroup *vg = benchCalloc(1, sizeof(SVGroup), true);
10✔
502
        benchArrayPush(database->vgArray, vg);
10✔
503
    }
504

505
    res = taos_query(conn->taos, cmd);
3✔
506
    code = taos_errno(res);
3✔
507
    if (code) {
3✔
508
        printErrCmdCodeStr(cmd, code, res);
×
509
        return -1;
×
510
    }
511

512
    int32_t vgItem = 0;
3✔
513
    while (((row = taos_fetch_row(res)) != NULL)
13✔
514
            && !g_arguments->terminate) {
13✔
515
        SVGroup *vg = benchArrayGet(database->vgArray, vgItem);
10✔
516
        vg->vgId = *(int32_t*)row[0];
10✔
517
        vgItem++;
10✔
518
    }
519
    taos_free_result(res);
3✔
520

521
    return vgroups;
3✔
522
}
523

524
#ifdef WEBSOCKET
525
int32_t getVgroupsWS(SBenchConn *conn, SDataBase *database) {
1✔
526
    int vgroups = 0;
1✔
527
    char sql[128] = "\0";
1✔
528
    snprintf(sql, sizeof(sql),
1✔
529
             g_arguments->escape_character
1✔
530
                 ? "SHOW `%s`.VGROUPS"
531
                 : "SHOW %s.VGROUPS",
532
             database->dbName);
533

534
    // query
535
    WS_RES *res = ws_query_timeout(conn->taos_ws, sql, g_arguments->timeout);
1✔
536
    int32_t code = ws_errno(res);
1✔
537
    if (code != 0) {
1✔
538
        // failed
539
        errorPrint("Failed ws_query_timeout <%s>, code: 0x%08x, reason: %s\n",
×
540
                   sql, code, ws_errstr(res));
541
        ws_free_result(res);           
×
542
        return 0;
×
543
    }
544

545
    // fetch
546
    WS_ROW row;
547
    database->vgArray = benchArrayInit(8, sizeof(SVGroup));
1✔
548
    while ( (row = ws_fetch_row(res)) && !g_arguments->terminate) {
5✔
549
        SVGroup *vg = benchCalloc(1, sizeof(SVGroup), true);
4✔
550
        vg->vgId = *(int32_t *)row[0];
4✔
551
        benchArrayPush(database->vgArray, vg);
4✔
552
        vgroups++;
4✔
553
        debugPrint(" ws fetch vgroups vgid=%d cnt=%d \n", vg->vgId, vgroups);
4✔
554
    }
555
    ws_free_result(res);
1✔
556
    database->vgroups = vgroups;
1✔
557

558
    // return count
559
    return vgroups;
1✔
560
}
561

562
/*
563
int32_t getTableVgidWS(SBenchConn *conn, char *db, char *tb, int32_t *vgId) {
564
    char sql[128] = "\0";
565
    snprintf(sql, sizeof(sql),
566
                 "select vgroup_id from information_schema.ins_tables where db_name='%s' and table_name='%s';",
567
                 db, tb);
568
    // query
569
    WS_RES *res = ws_query_timeout(conn->taos_ws, sql, g_arguments->timeout);
570
    int32_t code = ws_errno(res);
571
    if (code != 0) {
572
        // failed
573
        errorPrint("Failed ws_query_timeout <%s>, code: 0x%08x, reason: %s\n",
574
                   sql, code, ws_errstr(res));
575
        ws_free_result(res);           
576
        return code;
577
    }
578

579
    // fetch
580
    WS_ROW row;
581
    while ( (row = ws_fetch_row(res)) && !g_arguments->terminate) {
582
        *vgId = *(int32_t *)row[0];
583
        debugPrint(" getTableVgidWS table:%s vgid=%d\n", tb, *vgId);
584
        break;
585
    }
586
    ws_free_result(res);
587

588
    if(*vgId == 0) {
589
        return -1;
590
    } else {
591
        return 0;
592
    }   
593
}
594
*/
595

596
#endif
597

598
int32_t toolsGetDefaultVGroups() {
×
599
    int32_t cores = toolsGetNumberOfCores();
×
600
    if (cores < 3 ) {
×
601
        return 1;
×
602
    }
603

604
    int64_t MemKB = 0;
×
605
    benchGetTotalMemory(&MemKB);
×
606

607
    infoPrint("check local machine CPU: %d Memory:%d MB \n", cores, (int32_t)(MemKB/1024));
×
608
    if (MemKB <= 2*1024*1024) { // 2G
×
609
        return 1;
×
610
    } else if (MemKB <= 4*1024*1024) { // 4G
×
611
        return 2;
×
612
    } else if (MemKB <= 8*1024*1024) { // 8G
×
613
        return 3;
×
614
    } else if (MemKB <= 16*1024*1024) { // 16G
×
615
        return 4;
×
616
    } else if (MemKB <= 32*1024*1024) { // 32G
×
617
        return 5;
×
618
    } else {
619
        return cores / 2;
×
620
    }
621
}
622

623
int geneDbCreateCmd(SDataBase *database, char *command, int remainVnodes) {
152✔
624
    int dataLen = 0;
152✔
625
    int n;
626
    if (-1 != g_arguments->inputted_vgroups) {
152✔
627
        n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen,
2✔
628
                    g_arguments->escape_character
2✔
629
                        ? "CREATE DATABASE IF NOT EXISTS `%s` VGROUPS %d"
630
                        : "CREATE DATABASE IF NOT EXISTS %s VGROUPS %d",
631
                            database->dbName,
632
                            (-1 != g_arguments->inputted_vgroups)?
2✔
633
                            g_arguments->inputted_vgroups:
2✔
634
                            min(remainVnodes, toolsGetNumberOfCores()));
×
635
    } else {
636
        n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen,
150✔
637
                    g_arguments->escape_character
150✔
638
                        ? "CREATE DATABASE IF NOT EXISTS `%s`"
639
                        : "CREATE DATABASE IF NOT EXISTS %s",
640
                            database->dbName);
641
    }
642

643
    if (n < 0 || n >= SHORT_1K_SQL_BUFF_LEN - dataLen) {
152✔
644
        errorPrint("%s() LN%d snprintf overflow\n",
×
645
                           __func__, __LINE__);
646
        return -1;
×
647
    } else {
648
        dataLen += n;
152✔
649
    }
650

651
    if (database->cfgs) {
152✔
652
        for (int i = 0; i < database->cfgs->size; i++) {
218✔
653
            SDbCfg* cfg = benchArrayGet(database->cfgs, i);
66✔
654
            if (cfg->valuestring) {
66✔
655
                n = snprintf(command + dataLen,
7✔
656
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
7✔
657
                            " %s %s", cfg->name, cfg->valuestring);
658
            } else {
659
                n = snprintf(command + dataLen,
59✔
660
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
59✔
661
                            " %s %d", cfg->name, cfg->valueint);
662
            }
663
            if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - dataLen) {
66✔
664
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
665
                           __func__, __LINE__, i);
666
                break;
×
667
            } else {
668
                dataLen += n;
66✔
669
            }
670
        }
671
    }
672

673
    switch (database->precision) {
152✔
674
        case TSDB_TIME_PRECISION_MILLI:
149✔
675
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
149✔
676
                                " PRECISION \'ms\';");
677
            break;
149✔
678
        case TSDB_TIME_PRECISION_MICRO:
2✔
679
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
2✔
680
                                " PRECISION \'us\';");
681
            break;
2✔
682
        case TSDB_TIME_PRECISION_NANO:
1✔
683
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
1✔
684
                                " PRECISION \'ns\';");
685
            break;
1✔
686
    }
687

688
    return dataLen;
152✔
689
}
690

691
int createDatabaseRest(SDataBase* database) {
9✔
692
    int32_t code = 0;
9✔
693
    char       command[SHORT_1K_SQL_BUFF_LEN] = "\0";
9✔
694

695
    int sockfd = createSockFd();
9✔
696
    if (sockfd < 0) {
9✔
697
        return -1;
×
698
    }
699

700
    // drop exist database
701
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
9✔
702
            g_arguments->escape_character
9✔
703
                ? "DROP DATABASE IF EXISTS `%s`;"
704
                : "DROP DATABASE IF EXISTS %s;",
705
             database->dbName);
706
    code = postProceSql(command,
9✔
707
                        database->dbName,
708
                        database->precision,
709
                        REST_IFACE,
710
                        0,
711
                        g_arguments->port,
9✔
712
                        false,
713
                        sockfd,
714
                        NULL);
715
    if (code != 0) {
9✔
716
        errorPrint("Failed to drop database %s\n", database->dbName);
×
717
    }
718

719
    // create database
720
    int remainVnodes = INT_MAX;
9✔
721
    geneDbCreateCmd(database, command, remainVnodes);
9✔
722
    code = postProceSql(command,
9✔
723
                        database->dbName,
724
                        database->precision,
725
                        REST_IFACE,
726
                        0,
727
                        g_arguments->port,
9✔
728
                        false,
729
                        sockfd,
730
                        NULL);
731
    int32_t trying = g_arguments->keep_trying;
9✔
732
    while (code && trying) {
9✔
733
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
734
                "re-create database %s\n",
735
                g_arguments->trying_interval, database->dbName);
736
        toolsMsleep(g_arguments->trying_interval);
×
737
        code = postProceSql(command,
×
738
                        database->dbName,
739
                        database->precision,
740
                        REST_IFACE,
741
                        0,
742
                        g_arguments->port,
×
743
                        false,
744
                        sockfd,
745
                        NULL);
746
        if (trying != -1) {
×
747
            trying--;
×
748
        }
749
    }
750

751
    destroySockFd(sockfd);
9✔
752
    return code;
9✔
753
}
754

755
int32_t getRemainVnodes(SBenchConn *conn) {
×
756
    int remainVnodes = 0;
×
757
    char command[SHORT_1K_SQL_BUFF_LEN] = "SHOW DNODES";
×
758

759
    TAOS_RES *res = taos_query(conn->taos, command);
×
760
    int32_t   code = taos_errno(res);
×
761
    if (code) {
×
762
        printErrCmdCodeStr(command, code, res);
×
763
        closeBenchConn(conn);
×
764
        return -1;
×
765
    }
766
    TAOS_ROW row = NULL;
×
767
    while ((row = taos_fetch_row(res)) != NULL) {
×
768
        remainVnodes += (*(int16_t*)(row[3]) - *(int16_t*)(row[2]));
×
769
    }
770
    debugPrint("%s() LN%d, remainVnodes: %d\n",
×
771
               __func__, __LINE__, remainVnodes);
772
    taos_free_result(res);
×
773
    return remainVnodes;
×
774
}
775

776
int createDatabaseTaosc(SDataBase* database) {
147✔
777
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
147✔
778
    // conn
779
    SBenchConn* conn = initBenchConn();
147✔
780
    if (NULL == conn) {
147✔
781
        return -1;
4✔
782
    }
783

784
    // drop stream in old database
785
    for (int i = 0; i < g_arguments->streams->size; i++) {
145✔
786
        SSTREAM* stream = benchArrayGet(g_arguments->streams, i);
2✔
787
        if (stream->drop) {
2✔
788
            snprintf(command, SHORT_1K_SQL_BUFF_LEN,
2✔
789
                        "DROP STREAM IF EXISTS %s;",
790
                    stream->stream_name);
2✔
791
            if (queryDbExecCall(conn, command)) {
2✔
792
                closeBenchConn(conn);
×
793
                return -1;
×
794
            }
795
            infoPrint("%s\n", command);
2✔
796
            memset(command, 0, SHORT_1K_SQL_BUFF_LEN);
2✔
797
        }
798
    }
799

800
    // drop old database
801
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
143✔
802
            g_arguments->escape_character
143✔
803
                ? "DROP DATABASE IF EXISTS `%s`;":
804
            "DROP DATABASE IF EXISTS %s;",
805
             database->dbName);
806
    if (0 != queryDbExecCall(conn, command)) {
143✔
807
#ifdef WEBSOCKET
808
        if (g_arguments->websocket) {
×
809
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
810
                      "to drop database! DROP DATABASE failure is ignored!\n");
811
        } else {
812
#endif
813
            closeBenchConn(conn);
×
814
            return -1;
×
815
#ifdef WEBSOCKET
816
        }
817
#endif
818
    }
819

820
    // get remain vgroups
821
    int remainVnodes = INT_MAX;
143✔
822
#ifndef WEBSOCKET    
823
    if (g_arguments->bind_vgroup) {
824
        remainVnodes = getRemainVnodes(conn);
825
        if (0 >= remainVnodes) {
826
            errorPrint("Remain vnodes %d, failed to create database\n",
827
                       remainVnodes);
828
            return -1;
829
        }
830
    }
831
#endif
832

833
    // generate and execute create database sql
834
    geneDbCreateCmd(database, command, remainVnodes);
143✔
835
    int32_t code = queryDbExecCall(conn, command);
143✔
836
    int32_t trying = g_arguments->keep_trying;
143✔
837
    while (code && trying) {
143✔
UNCOV
838
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
839
                  "re-create database %s\n",
840
                  g_arguments->trying_interval, database->dbName);
UNCOV
841
        toolsMsleep(g_arguments->trying_interval);
×
UNCOV
842
        code = queryDbExecCall(conn, command);
×
UNCOV
843
        if (trying != -1) {
×
UNCOV
844
            trying--;
×
845
        }
846
    }
847

848
    if (code) {
143✔
849
#ifdef WEBSOCKET
850
        if (g_arguments->websocket) {
1✔
851
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
852
                      "to create database! CREATE DATABASE "
853
                      "failure is ignored!\n");
854
        } else {
855
#endif
856

857
            closeBenchConn(conn);
1✔
858
            errorPrint("\ncreate database %s failed!\n\n",
1✔
859
               database->dbName);
860
            return -1;
1✔
861
#ifdef WEBSOCKET
862
        }
863
#endif
864
    }
865
    infoPrint("command to create database: <%s>\n", command);
142✔
866

867

868
    // malloc and get vgroup
869
    if (g_arguments->bind_vgroup) {
142✔
870
        int32_t vgroups;
871
#ifdef WEBSOCKET
872
        if (g_arguments->websocket) {
2✔
873
            vgroups = getVgroupsWS(conn, database);
×
874
        } else {
875
#endif
876
            vgroups = getVgroupsNative(conn, database);
2✔
877
#ifdef WEBSOCKET
878
        }
879
#endif
880
        if (vgroups <= 0) {
2✔
881
            closeBenchConn(conn);
×
882
            errorPrint("Database %s's vgroups is %d\n",
×
883
                        database->dbName, vgroups);
884
            return -1;
×
885
        }
886
    }
887

888
    closeBenchConn(conn);
142✔
889
    return 0;
142✔
890
}
891

892
int createDatabase(SDataBase* database) {
156✔
893
    int ret = 0;
156✔
894
    if (REST_IFACE == g_arguments->iface || SML_REST_IFACE == g_arguments->iface) {
156✔
895
        ret = createDatabaseRest(database);
9✔
896
    } else {
897
        ret = createDatabaseTaosc(database);
147✔
898
    }
899
#if 0
900
#ifdef LINUX
901
    infoPrint("%s() LN%d, ret: %d\n", __func__, __LINE__, ret);
902
    sleep(10);
903
    infoPrint("%s() LN%d, ret: %d\n", __func__, __LINE__, ret);
904
#elif defined(DARWIN)
905
    sleep(2);
906
#else
907
    Sleep(2);
908
#endif
909
#endif
910

911
    return ret;
156✔
912
}
913

914
static int generateChildTblName(int len, char *buffer, SDataBase *database,
21,217✔
915
                                SSuperTable *stbInfo, uint64_t tableSeq, char* tagData, int i,
916
                                char *ttl) {
917
    if (0 == len) {
21,217✔
918
        memset(buffer, 0, TSDB_MAX_ALLOWED_SQL_LEN);
20,609✔
919
        len += snprintf(buffer + len,
20,609✔
920
                        TSDB_MAX_ALLOWED_SQL_LEN - len, "CREATE TABLE");
20,609✔
921
    }
922

923
    len += snprintf(
21,217✔
924
            buffer + len, TSDB_MAX_ALLOWED_SQL_LEN - len,
21,217✔
925
            g_arguments->escape_character
21,217✔
926
            ? " IF NOT EXISTS `%s`.`%s%" PRIu64 "` USING `%s`.`%s` TAGS (%s) %s "
927
            : " IF NOT EXISTS %s.%s%" PRIu64 " USING %s.%s TAGS (%s) %s ",
928
            database->dbName, stbInfo->childTblPrefix, tableSeq, database->dbName,
929
            stbInfo->stbName,
930
            tagData + i * stbInfo->lenOfTags, ttl);
21,217✔
931

932
    return len;
21,217✔
933
}
934

935
static int getBatchOfTblCreating(threadInfo *pThreadInfo,
21,204✔
936
                                         SSuperTable *stbInfo) {
937
    BArray *batchArray = stbInfo->batchTblCreatingNumbersArray;
21,204✔
938
    if (batchArray) {
21,204✔
939
        int *batch = benchArrayGet(
18✔
940
                batchArray, pThreadInfo->posOfTblCreatingBatch);
18✔
941
        pThreadInfo->posOfTblCreatingBatch++;
19✔
942
        if (pThreadInfo->posOfTblCreatingBatch == batchArray->size) {
19✔
943
            pThreadInfo->posOfTblCreatingBatch = 0;
4✔
944
        }
945
        return *batch;
19✔
946
    }
947
    return 0;
21,186✔
948
}
949

950
static int getIntervalOfTblCreating(threadInfo *pThreadInfo,
20,291✔
951
                                         SSuperTable *stbInfo) {
952
    BArray *intervalArray = stbInfo->batchTblCreatingIntervalsArray;
20,291✔
953
    if (intervalArray) {
20,291✔
954
        int *interval = benchArrayGet(
8✔
955
                intervalArray, pThreadInfo->posOfTblCreatingInterval);
8✔
956
        pThreadInfo->posOfTblCreatingInterval++;
8✔
957
        if (pThreadInfo->posOfTblCreatingInterval == intervalArray->size) {
8✔
958
            pThreadInfo->posOfTblCreatingInterval = 0;
2✔
959
        }
960
        return *interval;
8✔
961
    }
962
    return 0;
20,283✔
963
}
964

965
// table create thread
966
static void *createTable(void *sarg) {
533✔
967
    if (g_arguments->supplementInsert) {
533✔
968
        return NULL;
1✔
969
    }
970

971
    threadInfo  *pThreadInfo    = (threadInfo *)sarg;
532✔
972
    SDataBase   *database       = pThreadInfo->dbInfo;
532✔
973
    SSuperTable *stbInfo        = pThreadInfo->stbInfo;
532✔
974
    uint64_t    lastTotalCreate = 0;
532✔
975
    uint64_t    lastPrintTime   = toolsGetTimestampMs();
532✔
976
    int32_t     len             = 0;
532✔
977
    int32_t     batchNum        = 0;
532✔
978
    char ttl[SMALL_BUFF_LEN]    = "";
532✔
979

980
#ifdef LINUX
981
    prctl(PR_SET_NAME, "createTable");
532✔
982
#endif
983
    pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
532✔
984
    infoPrint(
532✔
985
              "thread[%d] start creating table from %" PRIu64 " to %" PRIu64
986
              "\n",
987
              pThreadInfo->threadID, pThreadInfo->start_table_from,
988
              pThreadInfo->end_table_to);
989
    if (stbInfo->ttl != 0) {
532✔
990
        snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
20✔
991
    }
992

993
    // tag read from csv
994
    FILE *csvFile = openTagCsv(stbInfo);
532✔
995
    // malloc
996
    char* tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
532✔
997
    int         w = 0; // record tagData
529✔
998

999
    int smallBatchCount = 0;
529✔
1000
    for (uint64_t i = pThreadInfo->start_table_from;
529✔
1001
                  i <= pThreadInfo->end_table_to && !g_arguments->terminate;
21,788✔
1002
                  i++) {
21,259✔
1003
        if (g_arguments->terminate) {
21,256✔
1004
            goto create_table_end;
×
1005
        }
1006
        if (!stbInfo->use_metric || stbInfo->tags->size == 0) {
21,256✔
1007
            if (stbInfo->childTblCount == 1) {
42✔
1008
                snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
3✔
1009
                         g_arguments->escape_character
3✔
1010
                         ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` %s;"
1011
                         : "CREATE TABLE IF NOT EXISTS %s.%s %s;",
1012
                         database->dbName, stbInfo->stbName,
1013
                         stbInfo->colsOfCreateChildTable);
1014
            } else {
1015
                snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
39✔
1016
                         g_arguments->escape_character
39✔
1017
                         ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` %s;"
1018
                         : "CREATE TABLE IF NOT EXISTS %s.%s %s;",
1019
                         database->dbName,
1020
                         stbInfo->childTblArray[i]->name,
39✔
1021
                         stbInfo->colsOfCreateChildTable);
1022
            }
1023
            batchNum++;
42✔
1024
        } else {
1025
            if (0 == len) {
21,214✔
1026
                batchNum = 0;
20,610✔
1027
            }
1028
            // generator
1029
            if (w == 0) {
21,214✔
1030
                if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL)) {
693✔
1031
                    goto create_table_end;
×
1032
                }
1033
            }
1034

1035
            len = generateChildTblName(len, pThreadInfo->buffer,
21,214✔
1036
                                       database, stbInfo, i, tagData, w, ttl);
1037
            // move next
1038
            if (++w >= TAG_BATCH_COUNT) {
21,220✔
1039
                // reset for gen again
1040
                w = 0;
193✔
1041
            }                           
1042

1043
            batchNum++;
21,220✔
1044
            smallBatchCount++;
21,220✔
1045

1046
            int smallBatch = getBatchOfTblCreating(pThreadInfo, stbInfo);
21,220✔
1047
            if ((!smallBatch || (smallBatchCount == smallBatch))
21,211✔
1048
                    && (batchNum < stbInfo->batchTblCreatingNum)
21,203✔
1049
                    && ((TSDB_MAX_ALLOWED_SQL_LEN - len) >=
966✔
1050
                        (stbInfo->lenOfTags + EXTRA_SQL_LEN))) {
966✔
1051
                continue;
966✔
1052
            } else {
1053
                smallBatchCount = 0;
20,245✔
1054
            }
1055
        }
1056

1057
        len = 0;
20,287✔
1058

1059
        int ret = 0;
20,287✔
1060
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
20,287✔
1061
                   pThreadInfo->buffer);
1062
        if (REST_IFACE == stbInfo->iface) {
20,292✔
1063
            ret = queryDbExecRest(pThreadInfo->buffer,
12✔
1064
                                  database->dbName,
1065
                                  database->precision,
1066
                                  stbInfo->iface,
12✔
1067
                                  stbInfo->lineProtocol,
12✔
1068
                                  stbInfo->tcpTransfer,
12✔
1069
                                  pThreadInfo->sockfd);
1070
        } else {
1071
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
20,280✔
1072
            int32_t trying = g_arguments->keep_trying;
20,282✔
1073
            while (ret && trying) {
20,282✔
1074
                infoPrint("will sleep %"PRIu32" milliseconds then re-create "
×
1075
                          "table %s\n",
1076
                          g_arguments->trying_interval, pThreadInfo->buffer);
1077
                toolsMsleep(g_arguments->trying_interval);
×
1078
                ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
1079
                if (trying != -1) {
×
1080
                    trying--;
×
1081
                }
1082
            }
1083
        }
1084

1085
        if (0 != ret) {
20,291✔
1086
            g_fail = true;
×
1087
            goto create_table_end;
×
1088
        }
1089
        uint64_t intervalOfTblCreating = getIntervalOfTblCreating(pThreadInfo,
20,291✔
1090
                                                                  stbInfo);
1091
        if (intervalOfTblCreating) {
20,290✔
1092
            debugPrint("will sleep %"PRIu64" milliseconds "
8✔
1093
                       "for table creating interval\n", intervalOfTblCreating);
1094
            toolsMsleep(intervalOfTblCreating);
8✔
1095
        }
1096

1097
        pThreadInfo->tables_created += batchNum;
20,290✔
1098
        batchNum = 0;
20,290✔
1099
        uint64_t currentPrintTime = toolsGetTimestampMs();
20,290✔
1100
        if (currentPrintTime - lastPrintTime > PRINT_STAT_INTERVAL) {
20,291✔
1101
            float speed = (pThreadInfo->tables_created - lastTotalCreate) * 1000 / (currentPrintTime - lastPrintTime);
×
1102
            infoPrint("thread[%d] already created %" PRId64 " tables, peroid speed: %.0f tables/s\n",
×
1103
                       pThreadInfo->threadID, pThreadInfo->tables_created, speed);
1104
            lastPrintTime   = currentPrintTime;
2✔
1105
            lastTotalCreate = pThreadInfo->tables_created;
2✔
1106
        }
1107
    }
1108

1109
    if (0 != len) {
532✔
1110
        int ret = 0;
359✔
1111
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
359✔
1112
                   pThreadInfo->buffer);
1113
        if (REST_IFACE == stbInfo->iface) {
359✔
1114
            ret = queryDbExecRest(pThreadInfo->buffer,
8✔
1115
                                  database->dbName,
1116
                                  database->precision,
1117
                                  stbInfo->iface,
8✔
1118
                                  stbInfo->lineProtocol,
8✔
1119
                                  stbInfo->tcpTransfer,
8✔
1120
                                  pThreadInfo->sockfd);
1121
        } else {
1122
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
351✔
1123
        }
1124
        if (0 != ret) {
359✔
1125
            g_fail = true;
×
1126
            goto create_table_end;
×
1127
        }
1128
        pThreadInfo->tables_created += batchNum;
359✔
1129
        debugPrint("thread[%d] already created %" PRId64 " tables\n",
359✔
1130
                   pThreadInfo->threadID, pThreadInfo->tables_created);
1131
    }
1132
create_table_end:
530✔
1133
    // free
1134
    tmfree(tagData);
532✔
1135
    tmfree(pThreadInfo->buffer);
532✔
1136
    pThreadInfo->buffer = NULL;
532✔
1137
    if(csvFile) {
532✔
1138
        fclose(csvFile);
12✔
1139
    }
1140
    return NULL;
532✔
1141
}
1142

1143
static int startMultiThreadCreateChildTable(SDataBase* database, SSuperTable* stbInfo) {
140✔
1144
    int32_t code    = -1;
140✔
1145
    int32_t threads = g_arguments->table_threads;
140✔
1146
    int64_t ntables;
1147
    if (stbInfo->childTblTo > 0) {
140✔
1148
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
1✔
1149
    } else if(stbInfo->childTblFrom > 0) {
139✔
1150
        ntables = stbInfo->childTblCount - stbInfo->childTblFrom;
×
1151
    } else {
1152
        ntables = stbInfo->childTblCount;
139✔
1153
    }
1154
    pthread_t   *pids = benchCalloc(1, threads * sizeof(pthread_t), false);
140✔
1155
    threadInfo  *infos = benchCalloc(1, threads * sizeof(threadInfo), false);
140✔
1156
    uint64_t     tableFrom = stbInfo->childTblFrom;
140✔
1157
    if (threads < 1) {
140✔
1158
        threads = 1;
×
1159
    }
1160
    if (ntables == 0) {
140✔
1161
        errorPrint("failed to create child table, childTblCount: %"PRId64"\n", ntables);
×
1162
        goto over;
×
1163
    }
1164

1165
    int64_t div = ntables / threads;
140✔
1166
    if (div < 1) {
140✔
1167
        threads = (int)ntables;
71✔
1168
        div = 1;
71✔
1169
    }
1170
    int64_t mod = ntables % threads;
140✔
1171

1172
    int threadCnt = 0;
140✔
1173
    for (uint32_t i = 0; (i < threads && !g_arguments->terminate); i++) {
673✔
1174
        threadInfo *pThreadInfo = infos + i;
533✔
1175
        pThreadInfo->threadID = i;
533✔
1176
        pThreadInfo->stbInfo = stbInfo;
533✔
1177
        pThreadInfo->dbInfo = database;
533✔
1178
        if (REST_IFACE == stbInfo->iface) {
533✔
1179
            int sockfd = createSockFd();
17✔
1180
            if (sockfd < 0) {
17✔
1181
                FREE_PIDS_INFOS_RETURN_MINUS_1();
×
1182
            }
1183
            pThreadInfo->sockfd = sockfd;
17✔
1184
        } else {
1185
            pThreadInfo->conn = initBenchConn();
516✔
1186
            if (NULL == pThreadInfo->conn) {
516✔
1187
                goto over;
×
1188
            }
1189
        }
1190
        pThreadInfo->start_table_from = tableFrom;
533✔
1191
        pThreadInfo->ntables          = i < mod ? div + 1 : div;
533✔
1192
        pThreadInfo->end_table_to     = i < mod ? tableFrom + div : tableFrom + div - 1;
533✔
1193
        tableFrom = pThreadInfo->end_table_to + 1;
533✔
1194
        pThreadInfo->tables_created = 0;
533✔
1195
        debugPrint("div table by thread. i=%d from=%"PRId64" to=%"PRId64" ntable=%"PRId64"\n", i, pThreadInfo->start_table_from,
533✔
1196
                                        pThreadInfo->end_table_to, pThreadInfo->ntables);
1197
        pthread_create(pids + i, NULL, createTable, pThreadInfo);
533✔
1198
        threadCnt ++;
533✔
1199
    }
1200

1201
    for (int i = 0; i < threadCnt; i++) {
673✔
1202
        pthread_join(pids[i], NULL);
533✔
1203
    }
1204

1205
    if (g_arguments->terminate)  toolsMsleep(100);
140✔
1206

1207
    for (int i = 0; i < threadCnt; i++) {
673✔
1208
        threadInfo *pThreadInfo = infos + i;
533✔
1209
        g_arguments->actualChildTables += pThreadInfo->tables_created;
533✔
1210

1211
        if ((REST_IFACE != stbInfo->iface) && pThreadInfo->conn) {
533✔
1212
            closeBenchConn(pThreadInfo->conn);
516✔
1213
        }
1214
    }
1215

1216
    if (g_fail) {
140✔
1217
        goto over;
×
1218
    }
1219
    code = 0;
140✔
1220
over:
140✔
1221
    free(pids);
140✔
1222
    free(infos);
140✔
1223
    return code;
140✔
1224
}
1225

1226
static int createChildTables() {
161✔
1227
    int32_t    code;
1228
    infoPrint("start creating %" PRId64 " table(s) with %d thread(s)\n",
161✔
1229
              g_arguments->totalChildTables, g_arguments->table_threads);
1230
    if (g_arguments->fpOfInsertResult) {
161✔
1231
        infoPrintToFile(
161✔
1232
                  "start creating %" PRId64 " table(s) with %d thread(s)\n",
1233
                  g_arguments->totalChildTables, g_arguments->table_threads);
1234
    }
1235
    int64_t start = (double)toolsGetTimestampMs();
161✔
1236

1237
    for (int i = 0; (i < g_arguments->databases->size
161✔
1238
            && !g_arguments->terminate); i++) {
323✔
1239
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
162✔
1240
        if (database->superTbls) {
162✔
1241
            for (int j = 0; (j < database->superTbls->size
162✔
1242
                    && !g_arguments->terminate); j++) {
391✔
1243
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
229✔
1244
                if (stbInfo->autoTblCreating || stbInfo->iface == SML_IFACE
229✔
1245
                        || stbInfo->iface == SML_REST_IFACE) {
163✔
1246
                    g_arguments->autoCreatedChildTables +=
80✔
1247
                            stbInfo->childTblCount;
80✔
1248
                    continue;
80✔
1249
                }
1250
                if (stbInfo->childTblExists) {
149✔
1251
                    g_arguments->existedChildTables +=
9✔
1252
                            stbInfo->childTblCount;
9✔
1253
                    continue;
9✔
1254
                }
1255
                debugPrint("colsOfCreateChildTable: %s\n",
140✔
1256
                        stbInfo->colsOfCreateChildTable);
1257

1258
                code = startMultiThreadCreateChildTable(database, stbInfo);
140✔
1259
                if (code && !g_arguments->terminate) {
140✔
1260
                    return code;
×
1261
                }
1262
            }
1263
        }
1264
    }
1265

1266
    int64_t end = toolsGetTimestampMs();
161✔
1267
    if(end == start) {
161✔
1268
        end += 1;
39✔
1269
    }
1270
    succPrint(
161✔
1271
            "Spent %.4f seconds to create %" PRId64
1272
            " table(s) with %d thread(s) speed: %.0f tables/s, already exist %" PRId64
1273
            " table(s), actual %" PRId64 " table(s) pre created, %" PRId64
1274
            " table(s) will be auto created\n",
1275
            (float)(end - start) / 1000.0,
1276
            g_arguments->totalChildTables,
1277
            g_arguments->table_threads,
1278
            g_arguments->actualChildTables * 1000 / (float)(end - start),
1279
            g_arguments->existedChildTables,
1280
            g_arguments->actualChildTables,
1281
            g_arguments->autoCreatedChildTables);
1282
    return 0;
161✔
1283
}
1284

1285
static void freeChildTable(SChildTable *childTbl, int colsSize) {
32,015✔
1286
    if (childTbl->useOwnSample) {
32,015✔
1287
        if (childTbl->childCols) {
12✔
1288
            for (int col = 0; col < colsSize; col++) {
28✔
1289
                ChildField *childCol =
1290
                    benchArrayGet(childTbl->childCols, col);
20✔
1291
                if (childCol) {
20✔
1292
                    tmfree(childCol->stmtData.data);
20✔
1293
                    childCol->stmtData.data = NULL;
20✔
1294
                    tmfree(childCol->stmtData.is_null);
20✔
1295
                    childCol->stmtData.is_null = NULL;
20✔
1296
                    tmfree(childCol->stmtData.lengths);
20✔
1297
                    childCol->stmtData.lengths = NULL;
20✔
1298
                }
1299
            }
1300
            benchArrayDestroy(childTbl->childCols);
8✔
1301
        }
1302
        tmfree(childTbl->sampleDataBuf);
12✔
1303
    }
1304
    tmfree(childTbl);
32,015✔
1305
}
32,015✔
1306

1307
void postFreeResource() {
200✔
1308
    infoPrint("%s\n", "free resource and exit ...");
200✔
1309
    if (!g_arguments->terminate) {
200✔
1310
        tmfclose(g_arguments->fpOfInsertResult);
199✔
1311
    }
1312

1313
    for (int i = 0; i < g_arguments->databases->size; i++) {
401✔
1314
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
201✔
1315
        if (database->cfgs) {
201✔
1316
            for (int c = 0; c < database->cfgs->size; c++) {
271✔
1317
                SDbCfg *cfg = benchArrayGet(database->cfgs, c);
70✔
1318
                if (cfg->valuestring && cfg->free) {
70✔
1319
                    tmfree(cfg->valuestring);
1✔
1320
                    cfg->valuestring = NULL;
1✔
1321
                }
1322
            }
1323
            benchArrayDestroy(database->cfgs);
201✔
1324
        }
1325
        if (database->superTbls) {
201✔
1326
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
470✔
1327
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
269✔
1328
                tmfree(stbInfo->colsOfCreateChildTable);
269✔
1329
                stbInfo->colsOfCreateChildTable = NULL;
269✔
1330
                tmfree(stbInfo->sampleDataBuf);
269✔
1331
                stbInfo->sampleDataBuf = NULL;
269✔
1332
                tmfree(stbInfo->partialColNameBuf);
269✔
1333
                stbInfo->partialColNameBuf = NULL;
269✔
1334
                benchArrayDestroy(stbInfo->batchTblCreatingNumbersArray);
269✔
1335
                benchArrayDestroy(stbInfo->batchTblCreatingIntervalsArray);
269✔
1336
                for (int k = 0; k < stbInfo->tags->size; k++) {
1,622✔
1337
                    Field * tag = benchArrayGet(stbInfo->tags, k);
1,353✔
1338
                    tmfree(tag->stmtData.data);
1,353✔
1339
                    tag->stmtData.data = NULL;
1,353✔
1340
                    tmfree(tag->stmtData.is_null);
1,353✔
1341
                    tag->stmtData.is_null = NULL;
1,353✔
1342
                    tmfree(tag->stmtData.lengths);
1,353✔
1343
                    tag->stmtData.lengths = NULL;
1,353✔
1344
                }
1345
                benchArrayDestroy(stbInfo->tags);
269✔
1346

1347
                for (int k = 0; k < stbInfo->cols->size; k++) {
1,487✔
1348
                    Field * col = benchArrayGet(stbInfo->cols, k);
1,218✔
1349
                    tmfree(col->stmtData.data);
1,218✔
1350
                    col->stmtData.data = NULL;
1,218✔
1351
                    tmfree(col->stmtData.is_null);
1,218✔
1352
                    col->stmtData.is_null = NULL;
1,218✔
1353
                    tmfree(col->stmtData.lengths);
1,218✔
1354
                    col->stmtData.lengths = NULL;
1,218✔
1355
                }
1356
                if (g_arguments->test_mode == INSERT_TEST) {
269✔
1357
                    if (stbInfo->childTblArray) {
234✔
1358
                        for (int64_t child = 0; child < stbInfo->childTblCount;
32,244✔
1359
                                child++) {
32,015✔
1360
                            SChildTable *childTbl = stbInfo->childTblArray[child];
32,015✔
1361
                            if (childTbl) {
32,015✔
1362
                                tmfree(childTbl->name);
32,015✔
1363
                                freeChildTable(childTbl, stbInfo->cols->size);
32,015✔
1364
                            }
1365
                        }
1366
                    }
1367
                }
1368
                benchArrayDestroy(stbInfo->cols);
269✔
1369
                tmfree(stbInfo->childTblArray);
269✔
1370
                stbInfo->childTblArray = NULL;
269✔
1371
                benchArrayDestroy(stbInfo->tsmas);
269✔
1372

1373
                // free sqls
1374
                if(stbInfo->sqls) {
269✔
1375
                    char **sqls = stbInfo->sqls;
×
1376
                    while (*sqls) {
×
1377
                        free(*sqls);
×
1378
                        sqls++;
×
1379
                    }
1380
                    tmfree(stbInfo->sqls);
×
1381
                }
1382

1383
                // thread_bind
1384
                if (database->vgArray) {
269✔
1385
                    for (int32_t v = 0; v < database->vgroups; v++) {
18✔
1386
                        SVGroup *vg = benchArrayGet(database->vgArray, v);
14✔
1387
                        tmfree(vg->childTblArray);
14✔
1388
                        vg->childTblArray = NULL;
14✔
1389
                    }
1390
                    benchArrayDestroy(database->vgArray);
4✔
1391
                    database->vgArray = NULL;
4✔
1392
                }
1393
            }
1394
            benchArrayDestroy(database->superTbls);
201✔
1395
        }
1396
    }
1397
    benchArrayDestroy(g_arguments->databases);
200✔
1398
    benchArrayDestroy(g_arguments->streams);
200✔
1399
    tools_cJSON_Delete(root);
200✔
1400
}
200✔
1401

1402
int32_t execInsert(threadInfo *pThreadInfo, uint32_t k, int64_t *delay3) {
48,007✔
1403
    SDataBase *  database = pThreadInfo->dbInfo;
48,007✔
1404
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
48,007✔
1405
    TAOS_RES *   res = NULL;
48,007✔
1406
    int32_t      code = 0;
48,007✔
1407
    uint16_t     iface = stbInfo->iface;
48,007✔
1408
    int64_t      start = 0;
48,007✔
1409
    int32_t      affectRows = 0;
48,007✔
1410

1411
    int32_t trying = (stbInfo->keep_trying)?
96,014✔
1412
        stbInfo->keep_trying:g_arguments->keep_trying;
48,007✔
1413
    int32_t trying_interval = stbInfo->trying_interval?
96,014✔
1414
        stbInfo->trying_interval:g_arguments->trying_interval;
48,007✔
1415
    int protocol = stbInfo->lineProtocol;
48,007✔
1416

1417
    switch (iface) {
48,007✔
1418
        case TAOSC_IFACE:
44,923✔
1419
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
44,923✔
1420
            code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
44,923✔
1421
            while (code && trying && !g_arguments->terminate) {
44,935✔
UNCOV
1422
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1423
                          trying_interval);
UNCOV
1424
                toolsMsleep(trying_interval);
×
UNCOV
1425
                code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
1426
                if (trying != -1) {
×
UNCOV
1427
                    trying--;
×
1428
                }
1429
            }
1430
            break;
44,935✔
1431

1432
        case REST_IFACE:
28✔
1433
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
28✔
1434
            code = postProceSql(pThreadInfo->buffer,
28✔
1435
                                database->dbName,
1436
                                database->precision,
1437
                                stbInfo->iface,
28✔
1438
                                stbInfo->lineProtocol,
28✔
1439
                                g_arguments->port,
28✔
1440
                                stbInfo->tcpTransfer,
28✔
1441
                                pThreadInfo->sockfd,
1442
                                pThreadInfo->filePath);
28✔
1443
            while (code && trying && !g_arguments->terminate) {
28✔
1444
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1445
                          trying_interval);
1446
                toolsMsleep(trying_interval);
×
1447
                code = postProceSql(pThreadInfo->buffer,
×
1448
                                    database->dbName,
1449
                                    database->precision,
1450
                                    stbInfo->iface,
×
1451
                                    stbInfo->lineProtocol,
×
1452
                                    g_arguments->port,
×
1453
                                    stbInfo->tcpTransfer,
×
1454
                                    pThreadInfo->sockfd,
1455
                                    pThreadInfo->filePath);
×
1456
                if (trying != -1) {
×
1457
                    trying--;
×
1458
                }
1459
            }
1460
            break;
28✔
1461

1462
        case STMT_IFACE:
1,985✔
1463
            // add batch
1464
            if(!stbInfo->autoTblCreating) {
1,985✔
1465
                start = toolsGetTimestampUs();
1,951✔
1466
                if (taos_stmt_add_batch(pThreadInfo->conn->stmt) != 0) {
1,950✔
1467
                    errorPrint("taos_stmt_add_batch() failed! reason: %s\n",
×
1468
                            taos_stmt_errstr(pThreadInfo->conn->stmt));
1469
                    return -1;
×
1470
                }
1471
                if(delay3) {
1,953✔
1472
                    *delay3 += toolsGetTimestampUs() - start;
1,952✔
1473
                }
1474
            }
1475
            
1476
            // execute 
1477
            code = taos_stmt_execute(pThreadInfo->conn->stmt);
1,986✔
1478
            if (code) {
1,987✔
1479
                errorPrint(
1✔
1480
                           "failed to execute insert statement. reason: %s\n",
1481
                           taos_stmt_errstr(pThreadInfo->conn->stmt));
1482
                code = -1;
×
1483
            }
1484
            break;
1,986✔
1485

1486
        case STMT2_IFACE:
133✔
1487
            // execute 
1488
            code = taos_stmt2_exec(pThreadInfo->conn->stmt2, &affectRows);
133✔
1489
            if (code) {
133✔
1490
                errorPrint( "failed to call taos_stmt2_exec(). reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
×
1491
                code = -1;
×
1492
            }
1493
            debugPrint( "succ call taos_stmt2_exec() affectRows:%d\n", affectRows);
133✔
1494
            break;
132✔
1495

1496
        case SML_IFACE:
784✔
1497
            res = taos_schemaless_insert(
1,237✔
1498
                pThreadInfo->conn->taos, pThreadInfo->lines,
784✔
1499
                (TSDB_SML_JSON_PROTOCOL == protocol
1500
                    || SML_JSON_TAOS_FORMAT == protocol)
453✔
1501
                    ? 0 : k,
1502
                (SML_JSON_TAOS_FORMAT == protocol)
1503
                    ? TSDB_SML_JSON_PROTOCOL : protocol,
1504
                (TSDB_SML_LINE_PROTOCOL == protocol)
1505
                    ? database->sml_precision
1506
                    : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1507
            code = taos_errno(res);
784✔
1508
            trying = stbInfo->keep_trying;
782✔
1509
            while (code && trying && !g_arguments->terminate) {
782✔
1510
                taos_free_result(res);
×
1511
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1512
                          trying_interval);
1513
                toolsMsleep(trying_interval);
×
1514
                res = taos_schemaless_insert(
×
1515
                        pThreadInfo->conn->taos, pThreadInfo->lines,
×
1516
                        (TSDB_SML_JSON_PROTOCOL == protocol
1517
                            || SML_JSON_TAOS_FORMAT == protocol)
×
1518
                            ? 0 : k,
1519
                        (SML_JSON_TAOS_FORMAT == protocol)
1520
                            ? TSDB_SML_JSON_PROTOCOL : protocol,
1521
                        (TSDB_SML_LINE_PROTOCOL == protocol)
1522
                            ? database->sml_precision
1523
                            : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1524
                code = taos_errno(res);
×
1525
                if (trying != -1) {
×
1526
                    trying--;
×
1527
                }
1528
            }
1529

1530
            if (code != TSDB_CODE_SUCCESS && !g_arguments->terminate) {
782✔
1531
                debugPrint("Failed to execute "
×
1532
                           "schemaless insert content: %s\n\n",
1533
                        pThreadInfo->lines?(pThreadInfo->lines[0]?
1534
                            pThreadInfo->lines[0]:""):"");
1535
                errorPrint(
×
1536
                    "failed to execute schemaless insert. "
1537
                        "code: 0x%08x reason: %s\n\n",
1538
                        code, taos_errstr(res));
1539
            }
1540
            taos_free_result(res);
782✔
1541
            break;
784✔
1542

1543
        case SML_REST_IFACE: {
159✔
1544
            if (TSDB_SML_JSON_PROTOCOL == protocol
159✔
1545
                    || SML_JSON_TAOS_FORMAT == protocol) {
114✔
1546
                code = postProceSql(pThreadInfo->lines[0], database->dbName,
46✔
1547
                                    database->precision, stbInfo->iface,
46✔
1548
                                    protocol, g_arguments->port,
46✔
1549
                                    stbInfo->tcpTransfer,
46✔
1550
                                    pThreadInfo->sockfd, pThreadInfo->filePath);
46✔
1551
            } else {
1552
                int len = 0;
113✔
1553
                for (int i = 0; i < k; i++) {
1,073✔
1554
                    if (strlen(pThreadInfo->lines[i]) != 0) {
957✔
1555
                        int n;
1556
                        if (TSDB_SML_TELNET_PROTOCOL == protocol
957✔
1557
                                && stbInfo->tcpTransfer) {
635✔
1558
                            n = snprintf(pThreadInfo->buffer + len,
314✔
1559
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
314✔
1560
                                           "put %s\n", pThreadInfo->lines[i]);
314✔
1561
                        } else {
1562
                            n = snprintf(pThreadInfo->buffer + len,
643✔
1563
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
643✔
1564
                                            "%s\n",
1565
                                           pThreadInfo->lines[i]);
643✔
1566
                        }
1567
                        if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - len) {
957✔
1568
                            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
1569
                                __func__, __LINE__, i);
1570
                            break;
×
1571
                        } else {
1572
                            len += n;
960✔
1573
                        }
1574
                    } else {
1575
                        break;
×
1576
                    }
1577
                }
1578
                if (g_arguments->terminate) {
116✔
1579
                    break;
×
1580
                }
1581
                code = postProceSql(pThreadInfo->buffer, database->dbName,
116✔
1582
                        database->precision,
1583
                        stbInfo->iface, protocol,
116✔
1584
                        g_arguments->port,
116✔
1585
                        stbInfo->tcpTransfer,
116✔
1586
                        pThreadInfo->sockfd, pThreadInfo->filePath);
116✔
1587
            }
1588
            break;
160✔
1589
        }
1590
    }
1591
    return code;
48,020✔
1592
}
1593

1594
static int smartContinueIfFail(threadInfo *pThreadInfo,
×
1595
                               SChildTable *childTbl,
1596
                               char *tagData,
1597
                               int64_t i,
1598
                               char *ttl) {
1599
    SDataBase *  database = pThreadInfo->dbInfo;
×
1600
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
1601
    char *buffer =
1602
        benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
×
1603
    snprintf(
×
1604
            buffer, TSDB_MAX_ALLOWED_SQL_LEN,
1605
            g_arguments->escape_character ?
×
1606
                "CREATE TABLE IF NOT EXISTS `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s "
1607
                : "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (%s) %s ",
1608
            database->dbName, childTbl->name, database->dbName,
1609
            stbInfo->stbName,
1610
            tagData + i * stbInfo->lenOfTags, ttl);
×
1611
    debugPrint("creating table: %s\n", buffer);
×
1612
    int ret;
1613
    if (REST_IFACE == stbInfo->iface) {
×
1614
        ret = queryDbExecRest(buffer,
×
1615
                              database->dbName,
1616
                              database->precision,
1617
                              stbInfo->iface,
×
1618
                              stbInfo->lineProtocol,
×
1619
                              stbInfo->tcpTransfer,
×
1620
                              pThreadInfo->sockfd);
1621
    } else {
1622
        ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
1623
        int32_t trying = g_arguments->keep_trying;
×
1624
        while (ret && trying) {
×
1625
            infoPrint("will sleep %"PRIu32" milliseconds then "
×
1626
                      "re-create table %s\n",
1627
                      g_arguments->trying_interval, buffer);
1628
            toolsMsleep(g_arguments->trying_interval);
×
1629
            ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
1630
            if (trying != -1) {
×
1631
                trying--;
×
1632
            }
1633
        }
1634
    }
1635
    tmfree(buffer);
×
1636

1637
    return ret;
×
1638
}
1639

1640
static void cleanupAndPrint(threadInfo *pThreadInfo, char *mode) {
631✔
1641
    if (pThreadInfo) {
631✔
1642
        if (pThreadInfo->json_array) {
632✔
1643
            tools_cJSON_Delete(pThreadInfo->json_array);
126✔
1644
            pThreadInfo->json_array = NULL;
126✔
1645
        }
1646
        if (0 == pThreadInfo->totalDelay) {
632✔
1647
            pThreadInfo->totalDelay = 1;
4✔
1648
        }
1649
        succPrint(
632✔
1650
            "thread[%d] %s mode, completed total inserted rows: %" PRIu64
1651
            ", %.2f records/second\n",
1652
            pThreadInfo->threadID,
1653
            mode,
1654
            pThreadInfo->totalInsertRows,
1655
            (double)(pThreadInfo->totalInsertRows /
1656
            ((double)pThreadInfo->totalDelay / 1E6)));
1657
    }
1658
}
632✔
1659

1660
static int64_t getDisorderTs(SSuperTable *stbInfo, int *disorderRange) {
101,441,039✔
1661
    int64_t disorderTs = 0;
101,441,039✔
1662
    int64_t startTimestamp = stbInfo->startTimestamp;
101,441,039✔
1663
    if (stbInfo->disorderRatio > 0) {
101,441,039✔
1664
        int rand_num = taosRandom() % 100;
320✔
1665
        if (rand_num < stbInfo->disorderRatio) {
320✔
1666
            (*disorderRange)--;
151✔
1667
            if (0 == *disorderRange) {
151✔
1668
                *disorderRange = stbInfo->disorderRange;
×
1669
            }
1670
            disorderTs = startTimestamp - *disorderRange;
151✔
1671
            debugPrint("rand_num: %d, < disorderRatio: %d, "
151✔
1672
                       "disorderTs: %"PRId64"\n",
1673
                       rand_num, stbInfo->disorderRatio,
1674
                       disorderTs);
1675
        }
1676
    }
1677
    return disorderTs;
101,334,430✔
1678
}
1679

1680
void loadChildTableInfo(threadInfo* pThreadInfo) {
649✔
1681
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
649✔
1682
    if(!g_arguments->pre_load_tb_meta) {
649✔
1683
        return ;
657✔
1684
    }
1685
    if(pThreadInfo->conn == NULL) {
×
1686
        return ;
×
1687
    }
1688

1689
    char *db    = pThreadInfo->dbInfo->dbName;
×
1690
    int64_t cnt = pThreadInfo->end_table_to - pThreadInfo->start_table_from;
×
1691

1692
    // 100k
1693
    int   bufLen = 100 * 1024;
×
1694
    char *buf    = benchCalloc(1, bufLen, false);
×
1695
    int   pos    = 0;
×
1696
    infoPrint("start load child tables(%"PRId64") info...\n", cnt);
×
1697
    int64_t start = toolsGetTimestampUs();
×
1698
    for(int64_t i = pThreadInfo->start_table_from; i < pThreadInfo->end_table_to; i++) {
×
1699
        SChildTable *childTbl = stbInfo->childTblArray[i];
×
1700
        pos += sprintf(buf + pos, ",%s.%s", db, childTbl->name);
×
1701

1702
        if(pos >= bufLen - 256 || i + 1 == pThreadInfo->end_table_to) {
×
1703
            taos_load_table_info(pThreadInfo->conn, buf);
×
1704
            pos = 0;
×
1705
        }
1706
    }
1707
    int64_t delay = toolsGetTimestampUs() - start;
×
1708
    infoPrint("end load child tables info. delay=%.2fs\n", delay/1E6);
×
1709
    pThreadInfo->totalDelay += delay;
×
1710

1711
    tmfree(buf);
×
1712
}
1713

1714
// create conn again
1715
int32_t reCreateConn(threadInfo * pThreadInfo) {
×
1716
    // single
1717
    bool single = true;
×
1718
    if (pThreadInfo->dbInfo->superTbls->size > 1) {
×
1719
        single = false;
×
1720
    }
1721

1722
    //
1723
    // retry stmt2 init 
1724
    //
1725

1726
    // stmt2 close
1727
    if (pThreadInfo->conn->stmt2) {
×
1728
        taos_stmt2_close(pThreadInfo->conn->stmt2);
×
1729
        pThreadInfo->conn->stmt2 = NULL;
×
1730
    }
1731

1732
    // retry stmt2 init , maybe success
1733
    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
1734
    if (pThreadInfo->conn->stmt2) {
×
1735
        succPrint("%s", "reCreateConn first taos_stmt2_init() success and return.\n");
×
1736
        return 0;
×
1737
    }
1738

1739
    //
1740
    // close old
1741
    //
1742
    closeBenchConn(pThreadInfo->conn);
×
1743
    pThreadInfo->conn = NULL;
×
1744

1745
    //
1746
    // create new
1747
    //
1748

1749
    // conn
1750
    pThreadInfo->conn = initBenchConn();
×
1751
    if (pThreadInfo->conn == NULL) {
×
1752
        errorPrint("%s", "reCreateConn initBenchConn failed.");
×
1753
        return -1;
×
1754
    }
1755
    // stmt2
1756
    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
1757
    if (NULL == pThreadInfo->conn->stmt2) {
×
1758
        errorPrint("reCreateConn taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
×
1759
        return -1;
×
1760
    } 
1761
        
1762
    succPrint("%s", "reCreateConn second taos_stmt2_init() success.\n");
×
1763
    // select db 
1764
    if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
×
1765
        errorPrint("second taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
1766
        return -1;
×
1767
    }
1768

1769
    return 0;
×
1770
}
1771

1772
// reinit
1773
int32_t reConnectStmt2(threadInfo * pThreadInfo, int32_t w) {
×
1774
    // re-create connection
1775
    int32_t code = reCreateConn(pThreadInfo);
×
1776
    if (code != 0) {
×
1777
        return code;
×
1778
    }
1779

1780
    // prepare
1781
    code = prepareStmt2(pThreadInfo->conn->stmt2, pThreadInfo->stbInfo, NULL, w);
×
1782
    if (code != 0) {
×
1783
        return code;
×
1784
    }
1785

1786
    return code;
×
1787
}
1788

1789
int32_t submitStmt2Impl(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
133✔
1790
                    int64_t* startTs, int64_t* endTs, uint32_t* generated) {
1791
    // call bind
1792
    int64_t start = toolsGetTimestampUs();
133✔
1793
    int32_t code = taos_stmt2_bind_param(pThreadInfo->conn->stmt2, bindv, -1);
131✔
1794
    if (code != 0) {
132✔
1795
        errorPrint("taos_stmt2_bind_param failed, reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
×
1796
        return code;
×
1797
    }
1798
    debugPrint("interlace taos_stmt2_bind_param() ok.  bindv->count=%d \n", bindv->count);
132✔
1799
    *delay1 += toolsGetTimestampUs() - start;
132✔
1800

1801
    // execute
1802
    *startTs = toolsGetTimestampUs();
133✔
1803
    code = execInsert(pThreadInfo, *generated, delay3);
133✔
1804
    *endTs = toolsGetTimestampUs();
133✔
1805
    return code;
130✔
1806
}
1807

1808
int32_t submitStmt2(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
133✔
1809
                    int64_t* startTs, int64_t* endTs, uint32_t* generated, int32_t w) {
1810
    // calc loop
1811
    int32_t loop = 1;
133✔
1812
    SSuperTable* stbInfo = pThreadInfo->stbInfo;
133✔
1813
    if(stbInfo->continueIfFail == YES_IF_FAILED) {          
133✔
1814
        if(stbInfo->keep_trying > 1) {
×
1815
            loop = stbInfo->keep_trying;
×
1816
        } else {
1817
            loop = 3; // default
×
1818
        }
1819
    }
1820

1821
    // submit stmt2
1822
    int32_t i = 0;
133✔
1823
    bool connected = true;
133✔
1824
    while (1) {
×
1825
        int32_t code = -1;
133✔
1826
        if(connected) {
133✔
1827
            // reinit success to do submit
1828
            code = submitStmt2Impl(pThreadInfo, bindv, delay1, delay3, startTs, endTs, generated);
133✔
1829
        }
1830

1831
        // check code
1832
        if ( code == 0) {
133✔
1833
            // success
1834
            break;
133✔
1835
        } else {
1836
            // failed to try
NEW
1837
            if (--loop == 0) {
×
1838
                // failed finally
NEW
1839
                char tip[64] = "";
×
NEW
1840
                if (i > 0) {
×
NEW
1841
                    snprintf(tip, sizeof(tip), " after retry %d", i);
×
1842
                }
NEW
1843
                errorPrint("finally faild execute submitStmt2()%s\n", tip);
×
UNCOV
1844
                return -1;
×
1845
            }
1846

1847
            // wait a memont for trying
1848
            toolsMsleep(stbInfo->trying_interval);
×
1849
            // reinit
1850
            infoPrint("stmt2 start retry submit i=%d  after sleep %d ms...\n", i++, stbInfo->trying_interval);
×
1851
            code = reConnectStmt2(pThreadInfo, w);
×
1852
            if (code != 0) {
×
1853
                // faild and try again
1854
                errorPrint("faild reConnectStmt2 and retry again for next i=%d \n", i);
×
1855
                connected = false;
×
1856
            } else {
1857
                // succ 
1858
                connected = true;
×
1859
            }
1860
        }
1861
    }
1862
    
1863
    // success
1864
    return 0;
133✔
1865
}
1866

1867
static void *syncWriteInterlace(void *sarg) {
67✔
1868
    threadInfo * pThreadInfo = (threadInfo *)sarg;
67✔
1869
    SDataBase *  database = pThreadInfo->dbInfo;
67✔
1870
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
67✔
1871
    infoPrint(
67✔
1872
              "thread[%d] start interlace inserting into table from "
1873
              "%" PRIu64 " to %" PRIu64 "\n",
1874
              pThreadInfo->threadID, pThreadInfo->start_table_from,
1875
              pThreadInfo->end_table_to);
1876

1877
    int64_t insertRows = stbInfo->insertRows;
68✔
1878
    int32_t interlaceRows = stbInfo->interlaceRows;
68✔
1879
    uint32_t nBatchTable  = g_arguments->reqPerReq / interlaceRows;
68✔
1880
    uint64_t   lastPrintTime = toolsGetTimestampMs();
68✔
1881
    uint64_t   lastTotalInsertRows = 0;
68✔
1882
    int64_t   startTs = toolsGetTimestampUs();
68✔
1883
    int64_t   endTs;
1884
    uint64_t   tableSeq = pThreadInfo->start_table_from;
68✔
1885
    int disorderRange = stbInfo->disorderRange;
68✔
1886
    int32_t i = 0;
68✔
1887

1888
    loadChildTableInfo(pThreadInfo);
68✔
1889
    // check if filling back mode
1890
    bool fillBack = false;
68✔
1891
    if(stbInfo->useNow && stbInfo->startFillbackTime) {
68✔
1892
        fillBack = true;
×
1893
        pThreadInfo->start_time = stbInfo->startFillbackTime;
×
1894
        infoPrint("start time change to startFillbackTime = %"PRId64" \n", pThreadInfo->start_time);
×
1895
    }
1896

1897
    FILE* csvFile = NULL;
68✔
1898
    char* tagData = NULL;
68✔
1899
    int   w       = 0; // record tags position, if w > TAG_BATCH_COUNT , need recreate new tag values
68✔
1900
    if (stbInfo->autoTblCreating) {
68✔
1901
        csvFile = openTagCsv(stbInfo);
5✔
1902
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
5✔
1903
    }
1904
    int64_t delay1 = 0;
68✔
1905
    int64_t delay2 = 0;
68✔
1906
    int64_t delay3 = 0;
68✔
1907
    bool    firstInsertTb = true;
68✔
1908

1909
    TAOS_STMT2_BINDV *bindv = NULL;
68✔
1910

1911
    // create bindv
1912
    if(stbInfo->iface == STMT2_IFACE) {
68✔
1913
        int32_t tagCnt = stbInfo->autoTblCreating ? stbInfo->tags->size : 0; // todo
×
1914
        //int32_t tagCnt = stbInfo->tags->size;
1915
        bindv = createBindV(nBatchTable,  tagCnt, stbInfo->cols->size + 1);
×
1916
    }
1917

1918
    bool oldInitStmt = stbInfo->autoTblCreating || database->superTbls->size > 1;
68✔
1919
    // not auto create table call once
1920
    if(stbInfo->iface == STMT_IFACE && !oldInitStmt) {
68✔
1921
        debugPrint("call prepareStmt for stable:%s\n", stbInfo->stbName);
13✔
1922
        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w)) {
13✔
1923
            g_fail = true;
×
1924
            goto free_of_interlace;
×
1925
        }
1926
    }
1927
    else if (stbInfo->iface == STMT2_IFACE) {
55✔
1928
        // only prepare once
1929
        if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, NULL, w)) {
×
1930
            g_fail = true;
×
1931
            goto free_of_interlace;
×
1932
        }
1933
    }    
1934

1935
    while (insertRows > 0) {
3,311✔
1936
        int64_t tmp_total_insert_rows = 0;
3,254✔
1937
        uint32_t generated = 0;
3,254✔
1938
        if (insertRows <= interlaceRows) {
3,254✔
1939
            interlaceRows = insertRows;
86✔
1940
        }
1941

1942
        // loop each table
1943
        for (i = 0; i < nBatchTable; i++) {
38,760✔
1944
            if (g_arguments->terminate) {
38,861✔
1945
                goto free_of_interlace;
×
1946
            }
1947
            int64_t pos = pThreadInfo->pos;
38,861✔
1948
            
1949
            // get childTable
1950
            SChildTable *childTbl;
1951
            if (g_arguments->bind_vgroup) {
38,861✔
1952
                childTbl = pThreadInfo->vg->childTblArray[tableSeq];
36,672✔
1953
            } else {
1954
                childTbl = stbInfo->childTblArray[tableSeq];
2,189✔
1955
            }
1956

1957
            char *  tableName   = childTbl->name;
38,861✔
1958
            char *sampleDataBuf = childTbl->useOwnSample?
77,722✔
1959
                                        childTbl->sampleDataBuf:
38,861✔
1960
                                        stbInfo->sampleDataBuf;
1961
            // init ts
1962
            if(childTbl->ts == 0) {
38,861✔
1963
               childTbl->ts = pThreadInfo->start_time;
266✔
1964
            }
1965
            char ttl[SMALL_BUFF_LEN] = "";
38,861✔
1966
            if (stbInfo->ttl != 0) {
38,861✔
1967
                snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
32✔
1968
            }
1969
            switch (stbInfo->iface) {
38,861✔
1970
                case REST_IFACE:
5,075✔
1971
                case TAOSC_IFACE: {
1972
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
5,075✔
1973
                    if (g_arguments->escape_character) {
5,075✔
1974
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "`%s`",
5,040✔
1975
                                tableName);
1976
                    } else {
1977
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "%s",
35✔
1978
                                tableName);
1979
                    }
1980
                    if (i == 0) {
5,075✔
1981
                        ds_add_str(&pThreadInfo->buffer, STR_INSERT_INTO);
1,290✔
1982
                    }
1983

1984
                    // generator
1985
                    if (stbInfo->autoTblCreating && w == 0) {
5,071✔
1986
                        if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL)) {
1✔
1987
                            goto free_of_interlace;
×
1988
                        }
1989
                    }
1990

1991
                    // create child table
1992
                    if (stbInfo->partialColNum == stbInfo->cols->size) {
5,071✔
1993
                        if (stbInfo->autoTblCreating) {
5,039✔
1994
                            ds_add_strs(&pThreadInfo->buffer, 8,
×
1995
                                    escapedTbName,
1996
                                    " USING `",
1997
                                    stbInfo->stbName,
1998
                                    "` TAGS (",
1999
                                    tagData + stbInfo->lenOfTags * w,
×
2000
                                    ") ", ttl, " VALUES ");
2001
                        } else {
2002
                            ds_add_strs(&pThreadInfo->buffer, 2,
5,039✔
2003
                                    escapedTbName, " VALUES ");
2004
                        }
2005
                    } else {
2006
                        if (stbInfo->autoTblCreating) {
32✔
2007
                            ds_add_strs(&pThreadInfo->buffer, 10,
32✔
2008
                                        escapedTbName,
2009
                                        " (",
2010
                                        stbInfo->partialColNameBuf,
2011
                                        ") USING `",
2012
                                        stbInfo->stbName,
2013
                                        "` TAGS (",
2014
                                        tagData + stbInfo->lenOfTags * w,
32✔
2015
                                        ") ", ttl, " VALUES ");
2016
                        } else {
2017
                            ds_add_strs(&pThreadInfo->buffer, 4,
×
2018
                                        escapedTbName,
2019
                                        "(",
2020
                                        stbInfo->partialColNameBuf,
2021
                                        ") VALUES ");
2022
                        }
2023
                    }
2024

2025
                    // move next
2026
                    if (stbInfo->autoTblCreating && ++w >= TAG_BATCH_COUNT) {
5,070✔
2027
                        // reset for gen again
2028
                        w = 0;
×
2029
                    }  
2030

2031
                    // write child data with interlaceRows
2032
                    for (int64_t j = 0; j < interlaceRows; j++) {
55,157✔
2033
                        int64_t disorderTs = getDisorderTs(stbInfo,
50,087✔
2034
                                &disorderRange);
2035

2036
                        // change fillBack mode with condition
2037
                        if(fillBack) {
50,059✔
2038
                            int64_t tsnow = toolsGetTimestamp(database->precision);
×
2039
                            if(childTbl->ts >= tsnow){
×
2040
                                fillBack = false;
×
2041
                                infoPrint("fillBack mode set end. because timestamp(%"PRId64") >= now(%"PRId64")\n", childTbl->ts, tsnow);
×
2042
                            }
2043
                        }
2044

2045
                        // timestamp         
2046
                        char time_string[BIGINT_BUFF_LEN];
2047
                        if(stbInfo->useNow && stbInfo->interlaceRows == 1 && !fillBack) {
50,051✔
2048
                            int64_t now = toolsGetTimestamp(database->precision);
×
2049
                            snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"", now);
×
2050
                        } else {
2051
                            snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"",
50,051✔
2052
                                    disorderTs?disorderTs:childTbl->ts);
2053
                        }
2054

2055
                        // combine rows timestamp | other cols = sampleDataBuf[pos]
2056
                        if(stbInfo->useSampleTs) {
50,051✔
2057
                            ds_add_strs(&pThreadInfo->buffer, 3, "(", 
×
2058
                                        sampleDataBuf + pos * stbInfo->lenOfCols, ") ");
×
2059
                        } else {
2060
                            ds_add_strs(&pThreadInfo->buffer, 5, "(", time_string, ",",
50,051✔
2061
                                        sampleDataBuf + pos * stbInfo->lenOfCols, ") ");
50,051✔
2062
                        }
2063
                        // check buffer enough
2064
                        if (ds_len(pThreadInfo->buffer)
50,085✔
2065
                                > stbInfo->max_sql_len) {
50,097✔
2066
                            errorPrint("sql buffer length (%"PRIu64") "
×
2067
                                    "is larger than max sql length "
2068
                                    "(%"PRId64")\n",
2069
                                    ds_len(pThreadInfo->buffer),
2070
                                    stbInfo->max_sql_len);
2071
                            goto free_of_interlace;
×
2072
                        }
2073

2074
                        // move next
2075
                        generated++;
50,097✔
2076
                        pos++;
50,097✔
2077
                        if (pos >= g_arguments->prepared_rand) {
50,097✔
2078
                            pos = 0;
47✔
2079
                        }
2080
                        if(stbInfo->primary_key)
50,097✔
2081
                            debugPrint("add child=%s %"PRId64" pk cur=%d cnt=%d \n", childTbl->name, childTbl->ts, childTbl->pkCur, childTbl->pkCnt);
×
2082

2083
                        // primary key
2084
                        if (!stbInfo->primary_key || needChangeTs(stbInfo, &childTbl->pkCur, &childTbl->pkCnt)) {
50,097✔
2085
                            childTbl->ts += stbInfo->timestamp_step;
50,093✔
2086
                            if(stbInfo->primary_key)
50,093✔
2087
                                debugPrint("changedTs child=%s %"PRId64" pk cur=%d cnt=%d \n", childTbl->name, childTbl->ts, childTbl->pkCur, childTbl->pkCnt);
×
2088
                        }
2089
                        
2090
                    }
2091
                    break;
5,070✔
2092
                }
2093
                case STMT_IFACE: {
33,610✔
2094
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
33,610✔
2095
                    if (g_arguments->escape_character) {
33,610✔
2096
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2,
1,694✔
2097
                                "`%s`", tableName);
2098
                    } else {
2099
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
31,916✔
2100
                                tableName);
2101
                    }
2102

2103
                    // generator
2104
                    if (stbInfo->autoTblCreating && w == 0) {
33,610✔
2105
                        if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL)) {
×
2106
                            goto free_of_interlace;
×
2107
                        }
2108
                    }
2109
                    
2110
                    // old must call prepareStmt for each table
2111
                    if (oldInitStmt) {
33,610✔
2112
                        debugPrint("call prepareStmt for stable:%s\n", stbInfo->stbName);
10✔
2113
                        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w)) {
10✔
2114
                            g_fail = true;
×
2115
                            goto free_of_interlace;
×
2116
                        }
2117
                    }
2118
      
2119
                    int64_t start = toolsGetTimestampUs();
33,610✔
2120
                    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
33,373✔
2121
                                             escapedTbName)) {
2122
                        errorPrint(
×
2123
                            "taos_stmt_set_tbname(%s) failed, reason: %s\n",
2124
                            tableName,
2125
                                taos_stmt_errstr(pThreadInfo->conn->stmt));
2126
                        g_fail = true;
×
2127
                        goto free_of_interlace;
×
2128
                    }
2129
                    delay1 += toolsGetTimestampUs() - start;
33,463✔
2130

2131
                    int32_t n = 0;
33,464✔
2132
                    generated += bindParamBatch(pThreadInfo, interlaceRows,
33,464✔
2133
                                       childTbl->ts, pos, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n, &delay2, &delay3);
2134
                    
2135
                    // move next
2136
                    pos += interlaceRows;
33,451✔
2137
                    if (pos + interlaceRows + 1 >= g_arguments->prepared_rand) {
33,451✔
2138
                        pos = 0;
17✔
2139
                    }
2140
                    childTbl->ts += stbInfo->timestamp_step * n;
33,451✔
2141

2142
                    // move next
2143
                    if (stbInfo->autoTblCreating) {
33,451✔
2144
                        w += 1;
×
2145
                        if (w >= TAG_BATCH_COUNT) {
×
2146
                            // reset for gen again
2147
                            w = 0;
×
2148
                        }
2149
                    }
2150

2151
                    break;
33,451✔
2152
                }
2153
                case STMT2_IFACE: {
×
2154
                    // tbnames
2155
                    bindv->tbnames[i] = childTbl->name;
×
2156

2157
                    // tags
2158
                    if (stbInfo->autoTblCreating) {
×
2159
                        // create
2160
                        if (w == 0) {
×
2161
                            // recreate sample tags
2162
                            if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, pThreadInfo->tagsStmt)) {
×
2163
                                goto free_of_interlace;
×
2164
                            }
2165
                        }
2166

2167
                        // first insert table need bring tags
2168
                        if (firstInsertTb) {
×
2169
                            bindVTags(bindv, i, w, pThreadInfo->tagsStmt);
×
2170
                        }
2171
                    } else {
2172
                        // if engine fix must bind tag bug , need remove this code
2173
                        //if (firstInsertTb) {
2174
                        //    bindVTags(bindv, i, 0, stbInfo->tags);
2175
                        //}
2176
                    }
2177

2178
                    // cols
2179
                    int32_t n = 0;
×
2180
                    generated += bindVColsInterlace(bindv, i, pThreadInfo, interlaceRows, childTbl->ts, pos, 
×
2181
                                                    childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n);                    
2182
                    // move next
2183
                    pos += interlaceRows;
×
2184
                    if (pos + interlaceRows + 1 >= g_arguments->prepared_rand) {
×
2185
                        pos = 0;
×
2186
                    }
2187
                    childTbl->ts += stbInfo->timestamp_step * n;
×
2188
                    if (stbInfo->autoTblCreating) {
×
2189
                        w += 1;
×
2190
                        if (w >= TAG_BATCH_COUNT) {
×
2191
                            // reset for gen again
2192
                            w = 0;
×
2193
                        }
2194
                    }
2195

2196
                    break;
×
2197
                }
2198
                case SML_REST_IFACE:
358✔
2199
                case SML_IFACE: {
2200
                    int protocol = stbInfo->lineProtocol;
358✔
2201
                    for (int64_t j = 0; j < interlaceRows; j++) {
1,674✔
2202
                        int64_t disorderTs = getDisorderTs(stbInfo,
1,316✔
2203
                                &disorderRange);
2204
                        if (TSDB_SML_JSON_PROTOCOL == protocol) {
1,316✔
2205
                            tools_cJSON *tag = tools_cJSON_Duplicate(
480✔
2206
                                tools_cJSON_GetArrayItem(
480✔
2207
                                    pThreadInfo->sml_json_tags,
480✔
2208
                                    (int)tableSeq -
480✔
2209
                                        pThreadInfo->start_table_from),
480✔
2210
                                    true);
2211
                            generateSmlJsonCols(
480✔
2212
                                pThreadInfo->json_array, tag, stbInfo,
2213
                                database->sml_precision,
480✔
2214
                                    disorderTs?disorderTs:childTbl->ts);
2215
                        } else if (SML_JSON_TAOS_FORMAT == protocol) {
836✔
2216
                            tools_cJSON *tag = tools_cJSON_Duplicate(
×
2217
                                tools_cJSON_GetArrayItem(
×
2218
                                    pThreadInfo->sml_json_tags,
×
2219
                                    (int)tableSeq -
×
2220
                                        pThreadInfo->start_table_from),
×
2221
                                    true);
2222
                            generateSmlTaosJsonCols(
×
2223
                                pThreadInfo->json_array, tag, stbInfo,
2224
                                database->sml_precision,
×
2225
                                disorderTs?disorderTs:childTbl->ts);
2226
                        } else if (TSDB_SML_LINE_PROTOCOL == protocol) {
836✔
2227
                            snprintf(
356✔
2228
                                pThreadInfo->lines[generated],
356✔
2229
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
356✔
2230
                                "%s %s %" PRId64 "",
2231
                                pThreadInfo
2232
                                    ->sml_tags[(int)tableSeq -
356✔
2233
                                               pThreadInfo->start_table_from],
356✔
2234
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
356✔
2235
                                disorderTs?disorderTs:childTbl->ts);
2236
                        } else {
2237
                            snprintf(
480✔
2238
                                pThreadInfo->lines[generated],
480✔
2239
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
480✔
2240
                                "%s %" PRId64 " %s %s", stbInfo->stbName,
2241
                                disorderTs?disorderTs:childTbl->ts,
2242
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
480✔
2243
                                pThreadInfo
2244
                                    ->sml_tags[(int)tableSeq -
480✔
2245
                                               pThreadInfo->start_table_from]);
480✔
2246
                        }
2247
                        generated++;
1,316✔
2248
                        // primary key
2249
                        if (!stbInfo->primary_key || needChangeTs(stbInfo, &childTbl->pkCur, &childTbl->pkCnt)) {
1,316✔
2250
                            childTbl->ts += stbInfo->timestamp_step;
1,316✔
2251
                        }
2252
                    }
2253
                    if (TSDB_SML_JSON_PROTOCOL == protocol
358✔
2254
                            || SML_JSON_TAOS_FORMAT == protocol) {
190✔
2255
                        pThreadInfo->lines[0] =
168✔
2256
                            tools_cJSON_PrintUnformatted(
168✔
2257
                                pThreadInfo->json_array);
168✔
2258
                    }
2259
                    break;
358✔
2260
                }
2261
            }
2262

2263
            // move to next table in one batch
2264
            tableSeq++;
38,697✔
2265
            tmp_total_insert_rows += interlaceRows;
38,697✔
2266
            if (tableSeq > pThreadInfo->end_table_to) {
38,697✔
2267
                // first insert tables loop is end
2268
                firstInsertTb = false;
3,191✔
2269
                // one tables loop timestamp and pos add 
2270
                tableSeq = pThreadInfo->start_table_from;
3,191✔
2271
                // save    
2272
                pThreadInfo->pos = pos;    
3,191✔
2273
                if (!stbInfo->non_stop) {
3,191✔
2274
                    insertRows -= interlaceRows;
3,191✔
2275
                }
2276

2277
                // if fillBack mode , can't sleep
2278
                if (stbInfo->insert_interval > 0 && !fillBack) {
3,191✔
2279
                    debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
35✔
2280
                          __func__, __LINE__, stbInfo->insert_interval);
2281
                    perfPrint("sleep %" PRIu64 " ms\n",
35✔
2282
                                     stbInfo->insert_interval);
2283
                    toolsMsleep((int32_t)stbInfo->insert_interval);
35✔
2284
                }
2285

2286
                i++;
3,190✔
2287
                // rectify bind count
2288
                if (bindv && bindv->count != i) {
3,190✔
2289
                    bindv->count = i;
×
2290
                }                
2291
                break;
3,190✔
2292
            }
2293
        }
2294

2295
        // exec
2296
        if(stbInfo->iface == STMT2_IFACE) {
3,089✔
2297
            // exec stmt2
2298
            if(g_arguments->debug_print)
×
2299
                showBindV(bindv, stbInfo->tags, stbInfo->cols);
×
2300
            // bind & exec stmt2
2301
            if (submitStmt2(pThreadInfo, bindv, &delay1, &delay3, &startTs, &endTs, &generated, w) != 0) {
×
2302
                g_fail = true;
×
2303
                goto free_of_interlace;
×
2304
            }
2305
        } else {
2306
            // exec other
2307
            startTs = toolsGetTimestampUs();
3,089✔
2308
            if (execInsert(pThreadInfo, generated, &delay3)) {
3,254✔
2309
                g_fail = true;
×
2310
                goto free_of_interlace;
×
2311
            }
2312
            endTs = toolsGetTimestampUs();
3,254✔
2313
        }
2314

2315
        debugPrint("execInsert tableIndex=%d left insert rows=%"PRId64" generated=%d\n", i, insertRows, generated);
3,244✔
2316
                
2317
        // reset count
2318
        if(bindv) {
3,239✔
2319
            bindv->count = 0;
×
2320
        }            
2321

2322
        pThreadInfo->totalInsertRows += tmp_total_insert_rows;
3,239✔
2323

2324
        if (g_arguments->terminate) {
3,239✔
2325
            goto free_of_interlace;
1✔
2326
        }
2327

2328
        int protocol = stbInfo->lineProtocol;
3,238✔
2329
        switch (stbInfo->iface) {
3,238✔
2330
            case TAOSC_IFACE:
1,289✔
2331
            case REST_IFACE:
2332
                debugPrint("pThreadInfo->buffer: %s\n",
1,289✔
2333
                           pThreadInfo->buffer);
2334
                free_ds(&pThreadInfo->buffer);
1,289✔
2335
                pThreadInfo->buffer = new_ds(0);
1,290✔
2336
                break;
1,290✔
2337
            case SML_REST_IFACE:
92✔
2338
                memset(pThreadInfo->buffer, 0,
92✔
2339
                       g_arguments->reqPerReq * (pThreadInfo->max_sql_len + 1));
92✔
2340
            case SML_IFACE:
217✔
2341
                if (TSDB_SML_JSON_PROTOCOL == protocol
217✔
2342
                        || SML_JSON_TAOS_FORMAT == protocol) {
118✔
2343
                    debugPrint("pThreadInfo->lines[0]: %s\n",
99✔
2344
                               pThreadInfo->lines[0]);
2345
                    if (pThreadInfo->json_array && !g_arguments->terminate) {
99✔
2346
                        tools_cJSON_Delete(pThreadInfo->json_array);
83✔
2347
                        pThreadInfo->json_array = NULL;
84✔
2348
                    }
2349
                    pThreadInfo->json_array = tools_cJSON_CreateArray();
100✔
2350
                    if (pThreadInfo->lines && pThreadInfo->lines[0]) {
84✔
2351
                        tmfree(pThreadInfo->lines[0]);
84✔
2352
                        pThreadInfo->lines[0] = NULL;
84✔
2353
                    }
2354
                } else {
2355
                    for (int j = 0; j < generated; j++) {
951✔
2356
                        if (pThreadInfo && pThreadInfo->lines
832✔
2357
                                && !g_arguments->terminate) {
831✔
2358
                            debugPrint("pThreadInfo->lines[%d]: %s\n", j,
831✔
2359
                                       pThreadInfo->lines[j]);
2360
                            memset(pThreadInfo->lines[j], 0,
832✔
2361
                                   pThreadInfo->max_sql_len);
2362
                        }
2363
                    }
2364
                }
2365
                break;
203✔
2366
            case STMT_IFACE:
1,749✔
2367
                break;
1,749✔
2368
        }
2369

2370
        int64_t delay4 = endTs - startTs;
3,225✔
2371
        int64_t delay = delay1 + delay2 + delay3 + delay4;
3,225✔
2372
        if (delay <=0) {
3,225✔
2373
            debugPrint("thread[%d]: startTS: %"PRId64", endTS: %"PRId64"\n",
×
2374
                       pThreadInfo->threadID, startTs, endTs);
2375
        } else {
2376
            perfPrint("insert execution time is %10.2f ms\n",
3,225✔
2377
                      delay / 1E6);
2378

2379
            int64_t * pdelay = benchCalloc(1, sizeof(int64_t), false);
3,225✔
2380
            *pdelay = delay;
3,252✔
2381
            if (benchArrayPush(pThreadInfo->delayList, pdelay) == NULL) {
3,252✔
2382
                tmfree(pdelay);
×
2383
            }
2384
            pThreadInfo->totalDelay += delay;
3,237✔
2385
            pThreadInfo->totalDelay1 += delay1;
3,237✔
2386
            pThreadInfo->totalDelay2 += delay2;
3,237✔
2387
            pThreadInfo->totalDelay3 += delay3;
3,237✔
2388
        }
2389
        delay1 = delay2 = delay3 = 0;
3,237✔
2390

2391
        int64_t currentPrintTime = toolsGetTimestampMs();
3,237✔
2392
        if (currentPrintTime - lastPrintTime > 30 * 1000) {
3,244✔
2393
            infoPrint(
×
2394
                    "thread[%d] has currently inserted rows: %" PRIu64
2395
                    ", peroid insert rate: %.3f rows/s \n",
2396
                    pThreadInfo->threadID, pThreadInfo->totalInsertRows,
2397
                    (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
2398
            lastPrintTime = currentPrintTime;
×
2399
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
×
2400
        }
2401
    }
2402

2403
free_of_interlace:
57✔
2404
    cleanupAndPrint(pThreadInfo, "interlace");
58✔
2405
    if(csvFile) {
68✔
2406
        fclose(csvFile);
×
2407
    }
2408
    tmfree(tagData);
68✔
2409
    freeBindV(bindv);
68✔
2410
    return NULL;
68✔
2411
}
2412

2413
static int32_t prepareProgressDataStmt(
224✔
2414
        threadInfo *pThreadInfo,
2415
        SChildTable *childTbl,
2416
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1, int64_t *delay2, int64_t *delay3) {
2417
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
224✔
2418
    char escapedTbName[TSDB_TABLE_NAME_LEN + 2] = "\0";
224✔
2419
    if (g_arguments->escape_character) {
224✔
2420
        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN + 2,
32✔
2421
                 "`%s`", childTbl->name);
2422
    } else {
2423
        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
192✔
2424
                 childTbl->name);
2425
    }
2426
    int64_t start = toolsGetTimestampUs();
224✔
2427
    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
223✔
2428
                             escapedTbName)) {
2429
        errorPrint(
×
2430
                "taos_stmt_set_tbname(%s) failed,"
2431
                "reason: %s\n", escapedTbName,
2432
                taos_stmt_errstr(pThreadInfo->conn->stmt));
2433
        return -1;
×
2434
    }
2435
    *delay1 = toolsGetTimestampUs() - start;
225✔
2436
    int32_t n = 0;
223✔
2437
    int64_t pos = i % g_arguments->prepared_rand;
223✔
2438
    if (g_arguments->prepared_rand - pos < g_arguments->reqPerReq) {
223✔
2439
        // remain prepare data less than batch, reset pos to zero
2440
        pos = 0;
×
2441
    }
2442
    int32_t generated = bindParamBatch(
447✔
2443
            pThreadInfo,
2444
            (g_arguments->reqPerReq > (stbInfo->insertRows - i))
223✔
2445
                ? (stbInfo->insertRows - i)
2446
                : g_arguments->reqPerReq,
223✔
2447
            *timestamp, pos, childTbl, pkCur, pkCnt, &n, delay2, delay3);
2448
    *timestamp += n * stbInfo->timestamp_step;
224✔
2449
    return generated;
224✔
2450
}
2451

2452
static void makeTimestampDisorder(
×
2453
        int64_t *timestamp, SSuperTable *stbInfo) {
2454
    int64_t startTimestamp = stbInfo->startTimestamp;
×
2455
    int disorderRange = stbInfo->disorderRange;
×
2456
    int rand_num = taosRandom() % 100;
×
2457
    if (rand_num < stbInfo->disorderRatio) {
×
2458
        disorderRange--;
×
2459
        if (0 == disorderRange) {
×
2460
            disorderRange = stbInfo->disorderRange;
×
2461
        }
2462
        *timestamp = startTimestamp - disorderRange;
×
2463
        debugPrint("rand_num: %d, < disorderRatio: %d"
×
2464
                   ", ts: %"PRId64"\n",
2465
                   rand_num,
2466
                   stbInfo->disorderRatio,
2467
                   *timestamp);
2468
    }
2469
}
×
2470

2471
static int32_t prepareProgressDataSmlJsonText(
291✔
2472
    threadInfo *pThreadInfo,
2473
    uint64_t tableSeq,
2474
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2475
    // prepareProgressDataSmlJsonText
2476
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
291✔
2477
    int32_t generated = 0;
291✔
2478

2479
    int len = 0;
291✔
2480

2481
    char *line = pThreadInfo->lines[0];
291✔
2482
    uint32_t line_buf_len = pThreadInfo->line_buf_len;
291✔
2483

2484
    strncat(line + len, "[", 2);
291✔
2485
    len += 1;
291✔
2486

2487
    int32_t pos = 0;
291✔
2488
    for (int j = 0; (j < g_arguments->reqPerReq)
291✔
2489
            && !g_arguments->terminate; j++) {
3,027✔
2490
        strncat(line + len, "{", 2);
2,884✔
2491
        len += 1;
2,884✔
2492
        int n;
2493
        n = snprintf(line + len, line_buf_len - len,
2,884✔
2494
                 "\"timestamp\":%"PRId64",", *timestamp);
2495
        if (n < 0 || n >= line_buf_len - len) {
2,884✔
2496
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2497
                       __func__, __LINE__, j);
2498
            return -1;
×
2499
        } else {
2500
            len += n;
2,889✔
2501
        }
2502

2503
        n = snprintf(line + len, line_buf_len - len, "%s",
2,889✔
2504
                        pThreadInfo->sml_json_value_array[tableSeq]);
2,889✔
2505
        if (n < 0 || n >= line_buf_len - len) {
2,889✔
2506
            errorPrint("%s() LN%d snprintf overflow on %d\n",
2✔
2507
                       __func__, __LINE__, j);
2508
            return -1;
×
2509
        } else {
2510
            len += n;
2,887✔
2511
        }
2512
        n = snprintf(line + len, line_buf_len - len, "\"tags\":%s,",
2,887✔
2513
                       pThreadInfo->sml_tags_json_array[tableSeq]);
2,887✔
2514
        if (n < 0 || n >= line_buf_len - len) {
2,887✔
2515
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2516
                       __func__, __LINE__, j);
2517
            return -1;
×
2518
        } else {
2519
            len += n;
2,888✔
2520
        }
2521
        n = snprintf(line + len, line_buf_len - len,
2,888✔
2522
                       "\"metric\":\"%s\"}", stbInfo->stbName);
2523
        if (n < 0 || n >= line_buf_len - len) {
2,888✔
UNCOV
2524
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2525
                       __func__, __LINE__, j);
2526
            return -1;
×
2527
        } else {
2528
            len += n;
2,888✔
2529
        }
2530

2531
        pos++;
2,888✔
2532
        if (pos >= g_arguments->prepared_rand) {
2,888✔
2533
            pos = 0;
288✔
2534
        }
2535

2536
        // primay key repeat ts count
2537
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
2,888✔
2538
            *timestamp += stbInfo->timestamp_step;
2,886✔
2539
        }
2540

2541
        if (stbInfo->disorderRatio > 0) {
2,888✔
2542
            makeTimestampDisorder(timestamp, stbInfo);
×
2543
        }
2544
        generated++;
2,883✔
2545
        if (i + generated >= stbInfo->insertRows) {
2,883✔
2546
            break;
147✔
2547
        }
2548
        if ((j+1) < g_arguments->reqPerReq) {
2,736✔
2549
            strncat(line + len, ",", 2);
2,595✔
2550
            len += 1;
2,595✔
2551
        }
2552
    }
2553
    strncat(line + len, "]", 2);
290✔
2554

2555
    debugPrint("%s() LN%d, lines[0]: %s\n",
290✔
2556
               __func__, __LINE__, pThreadInfo->lines[0]);
2557
    return generated;
291✔
2558
}
2559

2560
static int32_t prepareProgressDataSmlJson(
147✔
2561
    threadInfo *pThreadInfo,
2562
    uint64_t tableSeq,
2563
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2564
    // prepareProgressDataSmlJson
2565
    SDataBase *  database = pThreadInfo->dbInfo;
147✔
2566
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
147✔
2567
    int32_t generated = 0;
147✔
2568

2569
    int32_t pos = 0;
147✔
2570
    int protocol = stbInfo->lineProtocol;
147✔
2571
    for (int j = 0; (j < g_arguments->reqPerReq)
147✔
2572
            && !g_arguments->terminate; j++) {
1,523✔
2573
        tools_cJSON *tag = tools_cJSON_Duplicate(
1,451✔
2574
                tools_cJSON_GetArrayItem(
1,452✔
2575
                    pThreadInfo->sml_json_tags,
1,452✔
2576
                    (int)tableSeq -
1,452✔
2577
                    pThreadInfo->start_table_from),
1,452✔
2578
                true);
2579
        debugPrintJsonNoTime(tag);
1,451✔
2580
        if (TSDB_SML_JSON_PROTOCOL == protocol) {
1,451✔
2581
            generateSmlJsonCols(
×
2582
                    pThreadInfo->json_array, tag, stbInfo,
2583
                    database->sml_precision, *timestamp);
×
2584
        } else {
2585
            generateSmlTaosJsonCols(
1,451✔
2586
                    pThreadInfo->json_array, tag, stbInfo,
2587
                    database->sml_precision, *timestamp);
1,451✔
2588
        }
2589
        pos++;
1,452✔
2590
        if (pos >= g_arguments->prepared_rand) {
1,452✔
2591
            pos = 0;
145✔
2592
        }
2593

2594
        // primay key repeat ts count
2595
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
1,452✔
2596
            *timestamp += stbInfo->timestamp_step;
1,451✔
2597
        }
2598

2599
        if (stbInfo->disorderRatio > 0) {
1,452✔
2600
            makeTimestampDisorder(timestamp, stbInfo);
×
2601
        }
2602
        generated++;
1,451✔
2603
        if (i + generated >= stbInfo->insertRows) {
1,451✔
2604
            break;
75✔
2605
        }
2606
    }
2607

2608
    tmfree(pThreadInfo->lines[0]);
146✔
2609
    pThreadInfo->lines[0] = NULL;
147✔
2610
    pThreadInfo->lines[0] =
294✔
2611
            tools_cJSON_PrintUnformatted(
147✔
2612
                pThreadInfo->json_array);
147✔
2613
    debugPrint("pThreadInfo->lines[0]: %s\n",
147✔
2614
                   pThreadInfo->lines[0]);
2615

2616
    return generated;
147✔
2617
}
2618

2619
static int32_t prepareProgressDataSmlLineOrTelnet(
303✔
2620
    threadInfo *pThreadInfo, uint64_t tableSeq, char *sampleDataBuf,
2621
    int64_t *timestamp, uint64_t i, char *ttl, int protocol, int32_t *pkCur, int32_t *pkCnt) {
2622
    // prepareProgressDataSmlLine
2623
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
303✔
2624
    int32_t generated = 0;
303✔
2625

2626
    int32_t pos = 0;
303✔
2627
    for (int j = 0; (j < g_arguments->reqPerReq)
303✔
2628
            && !g_arguments->terminate; j++) {
92,029✔
2629
        // table index
2630
        int ti = tableSeq - pThreadInfo->start_table_from;
83,037✔
2631
        if (TSDB_SML_LINE_PROTOCOL == protocol) {
83,037✔
2632
            snprintf(
80,650✔
2633
                    pThreadInfo->lines[j],
80,650✔
2634
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
80,650✔
2635
                    "%s %s %" PRId64 "",
2636
                    pThreadInfo->sml_tags[ti],
80,650✔
2637
                    sampleDataBuf + pos * stbInfo->lenOfCols,
80,650✔
2638
                    *timestamp);
2639
        } else {
2640
            snprintf(
2,387✔
2641
                    pThreadInfo->lines[j],
2,387✔
2642
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
2,387✔
2643
                    "%s %" PRId64 " %s %s", stbInfo->stbName,
2644
                    *timestamp,
2645
                    sampleDataBuf
2646
                    + pos * stbInfo->lenOfCols,
2,387✔
2647
                    pThreadInfo->sml_tags[ti]);
2,387✔
2648
        }
2649
        //infoPrint("sml prepare j=%d stb=%s sml_tags=%s \n", j, stbInfo->stbName, pThreadInfo->sml_tags[ti]);
2650
        pos++;
83,037✔
2651
        if (pos >= g_arguments->prepared_rand) {
83,037✔
2652
            pos = 0;
272✔
2653
        }
2654
        // primay key repeat ts count
2655
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
83,037✔
2656
            *timestamp += stbInfo->timestamp_step;
94,064✔
2657
        }
2658
        
2659
        if (stbInfo->disorderRatio > 0) {
83,037✔
2660
            makeTimestampDisorder(timestamp, stbInfo);
×
2661
        }
2662
        generated++;
91,887✔
2663
        if (i + generated >= stbInfo->insertRows) {
91,887✔
2664
            break;
161✔
2665
        }
2666
    }
2667
    return generated;
9,153✔
2668
}
2669

2670
static int32_t prepareProgressDataSml(
742✔
2671
    threadInfo *pThreadInfo,
2672
    SChildTable *childTbl,
2673
    uint64_t tableSeq,
2674
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2675
    // prepareProgressDataSml
2676
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
742✔
2677

2678
    char *sampleDataBuf;
2679
    if (childTbl->useOwnSample) {
742✔
2680
        sampleDataBuf = childTbl->sampleDataBuf;
×
2681
    } else {
2682
        sampleDataBuf = stbInfo->sampleDataBuf;
742✔
2683
    }
2684
    int protocol = stbInfo->lineProtocol;
742✔
2685
    int32_t generated = -1;
742✔
2686
    switch (protocol) {
742✔
2687
        case TSDB_SML_LINE_PROTOCOL:
304✔
2688
        case TSDB_SML_TELNET_PROTOCOL:
2689
            generated = prepareProgressDataSmlLineOrTelnet(
304✔
2690
                    pThreadInfo,
2691
                    tableSeq,
2692
                    sampleDataBuf,
2693
                    timestamp, i, ttl, protocol, pkCur, pkCnt);
2694
            break;
303✔
2695
        case TSDB_SML_JSON_PROTOCOL:
290✔
2696
            generated = prepareProgressDataSmlJsonText(
290✔
2697
                    pThreadInfo,
2698
                    tableSeq - pThreadInfo->start_table_from,
290✔
2699
                timestamp, i, ttl, pkCur, pkCnt);
2700
            break;
291✔
2701
        case SML_JSON_TAOS_FORMAT:
147✔
2702
            generated = prepareProgressDataSmlJson(
147✔
2703
                    pThreadInfo,
2704
                    tableSeq,
2705
                    timestamp, i, ttl, pkCur, pkCnt);
2706
            break;
147✔
2707
        default:
1✔
2708
            errorPrint("%s() LN%d: unknown protcolor: %d\n",
1✔
2709
                       __func__, __LINE__, protocol);
2710
            break;
×
2711
    }
2712

2713
    return generated;
741✔
2714
}
2715

2716
// if return true, timestmap must add timestap_step, else timestamp no need changed
2717
bool needChangeTs(SSuperTable * stbInfo, int32_t *pkCur, int32_t *pkCnt) {
9,978✔
2718
    // check need generate cnt
2719
    if(*pkCnt == 0) {
9,978✔
2720
        if (stbInfo->repeat_ts_min >= stbInfo->repeat_ts_max) {
1,000✔
2721
            // fixed count value is max
2722
            if (stbInfo->repeat_ts_max == 0){
1,000✔
2723
                return true;
×
2724
            }
2725

2726
            *pkCnt = stbInfo->repeat_ts_max;
1,000✔
2727
        } else {
2728
            // random range
2729
            *pkCnt = RD(stbInfo->repeat_ts_max + 1);
×
2730
            if(*pkCnt < stbInfo->repeat_ts_min) {
×
2731
                *pkCnt = (*pkCnt + stbInfo->repeat_ts_min) % stbInfo->repeat_ts_max;
×
2732
            }
2733
        }
2734
    }
2735

2736
    // compare with current value
2737
    *pkCur = *pkCur + 1;
9,978✔
2738
    if(*pkCur >= *pkCnt) {
9,978✔
2739
        // reset zero
2740
        *pkCur = 0;
990✔
2741
        *pkCnt = 0;
990✔
2742
        return true;
990✔
2743
    } else {
2744
        // add one
2745
        return false;
8,988✔
2746
    }
2747
}
2748

2749
static int32_t prepareProgressDataSql(
42,912✔
2750
                    threadInfo *pThreadInfo,
2751
                    SChildTable *childTbl, 
2752
                    char* tagData,
2753
                    uint64_t tableSeq,
2754
                    char *sampleDataBuf,
2755
                    int64_t *timestamp, uint64_t i, char *ttl,
2756
                    int32_t *pos, uint64_t *len, int32_t* pkCur, int32_t* pkCnt) {
2757
    // prepareProgressDataSql
2758
    int32_t generated = 0;
42,912✔
2759
    SDataBase *database = pThreadInfo->dbInfo;
42,912✔
2760
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
42,912✔
2761
    char *  pstr = pThreadInfo->buffer;
42,912✔
2762
    int disorderRange = stbInfo->disorderRange;
42,912✔
2763

2764
    if (stbInfo->partialColNum == stbInfo->cols->size) {
42,912✔
2765
        if (stbInfo->autoTblCreating) {
42,811✔
2766
            *len =
60✔
2767
                snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
60✔
2768
                        g_arguments->escape_character
60✔
2769
                        ? "%s `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s VALUES "
2770
                        : "%s %s.%s USING %s.%s TAGS (%s) %s VALUES ",
2771
                         STR_INSERT_INTO, database->dbName,
2772
                         childTbl->name, database->dbName,
2773
                         stbInfo->stbName,
2774
                         tagData +
2775
                         stbInfo->lenOfTags * tableSeq, ttl);
60✔
2776
        } else {
2777
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
42,751✔
2778
                    g_arguments->escape_character
42,751✔
2779
                           ? "%s `%s`.`%s` VALUES "
2780
                           : "%s %s.%s VALUES ",
2781
                           STR_INSERT_INTO,
2782
                           database->dbName, childTbl->name);
2783
        }
2784
    } else {
2785
        if (stbInfo->autoTblCreating) {
101✔
2786
            *len = snprintf(
16✔
2787
                    pstr, TSDB_MAX_ALLOWED_SQL_LEN,
2788
                    g_arguments->escape_character
16✔
2789
                    ? "%s `%s`.`%s` (%s) USING `%s`.`%s` TAGS (%s) %s VALUES "
2790
                    : "%s %s.%s (%s) USING %s.%s TAGS (%s) %s VALUES ",
2791
                    STR_INSERT_INTO, database->dbName,
2792
                    childTbl->name,
2793
                    stbInfo->partialColNameBuf,
2794
                    database->dbName, stbInfo->stbName,
2795
                    tagData +
2796
                    stbInfo->lenOfTags * tableSeq, ttl);
16✔
2797
        } else {
2798
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
85✔
2799
                    g_arguments->escape_character
85✔
2800
                    ? "%s `%s`.`%s` (%s) VALUES "
2801
                    : "%s %s.%s (%s) VALUES ",
2802
                    STR_INSERT_INTO, database->dbName,
2803
                    childTbl->name,
2804
                    stbInfo->partialColNameBuf);
2805
        }
2806
    }
2807

2808
    char *ownSampleDataBuf;
2809
    if (childTbl->useOwnSample) {
42,912✔
2810
        debugPrint("%s is using own sample data\n",
6✔
2811
                  childTbl->name);
2812
        ownSampleDataBuf = childTbl->sampleDataBuf;
6✔
2813
    } else {
2814
        ownSampleDataBuf = stbInfo->sampleDataBuf;
42,906✔
2815
    }
2816
    for (int j = 0; j < g_arguments->reqPerReq; j++) {
101,074,761✔
2817
        if (stbInfo->useSampleTs
101,497,912✔
2818
                && (!stbInfo->random_data_source)) {
104✔
2819
            *len +=
104✔
2820
                snprintf(pstr + *len,
104✔
2821
                         TSDB_MAX_ALLOWED_SQL_LEN - *len, "(%s)",
104✔
2822
                         sampleDataBuf +
2823
                         *pos * stbInfo->lenOfCols);
104✔
2824
        } else {
2825
            int64_t disorderTs = getDisorderTs(stbInfo, &disorderRange);
101,497,808✔
2826
            *len += snprintf(pstr + *len,
101,054,682✔
2827
                            TSDB_MAX_ALLOWED_SQL_LEN - *len,
101,054,682✔
2828
                            "(%" PRId64 ",%s)",
2829
                            disorderTs?disorderTs:*timestamp,
2830
                            ownSampleDataBuf +
2831
                            *pos * stbInfo->lenOfCols);
101,054,682✔
2832
        }
2833
        *pos += 1;
101,054,786✔
2834
        if (*pos >= g_arguments->prepared_rand) {
101,054,786✔
2835
            *pos = 0;
127✔
2836
        }
2837
        // primary key
2838
        if(!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
101,054,786✔
2839
            *timestamp += stbInfo->timestamp_step;
101,345,935✔
2840
        }
2841
   
2842
        generated++;
101,054,786✔
2843
        if (*len > (TSDB_MAX_ALLOWED_SQL_LEN
101,054,786✔
2844
            - stbInfo->lenOfCols)) {
101,054,786✔
2845
            break;
11,901✔
2846
        }
2847
        if (i + generated >= stbInfo->insertRows) {
101,042,885✔
2848
            break;
11,036✔
2849
        }
2850
    }
2851

2852
    return generated;
×
2853
}
2854

2855
void *syncWriteProgressive(void *sarg) {
588✔
2856
    threadInfo * pThreadInfo = (threadInfo *)sarg;
588✔
2857
    SDataBase *  database = pThreadInfo->dbInfo;
588✔
2858
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
588✔
2859

2860
    loadChildTableInfo(pThreadInfo);
588✔
2861

2862
    // special deal flow for TAOSC_IFACE
2863
    if (insertDataMix(pThreadInfo, database, stbInfo)) {
589✔
2864
        // request be dealt by this function , so return
2865
        return NULL;
24✔
2866
    }
2867

2868
    infoPrint(
565✔
2869
        "thread[%d] start progressive inserting into table from "
2870
        "%" PRIu64 " to %" PRIu64 "\n",
2871
        pThreadInfo->threadID, pThreadInfo->start_table_from,
2872
        pThreadInfo->end_table_to + 1);
2873

2874
    uint64_t  lastPrintTime = toolsGetTimestampMs();
565✔
2875
    uint64_t  lastTotalInsertRows = 0;
565✔
2876
    int64_t   startTs = toolsGetTimestampUs();
565✔
2877
    int64_t   endTs;
2878

2879
    FILE* csvFile = NULL;
565✔
2880
    char* tagData = NULL;
565✔
2881
    bool  stmt    = (stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating;
565✔
2882
    bool  smart   = SMART_IF_FAILED == stbInfo->continueIfFail;
565✔
2883
    bool  acreate = (stbInfo->iface == TAOSC_IFACE || stbInfo->iface == REST_IFACE) && stbInfo->autoTblCreating;
565✔
2884
    int   w       = 0;
565✔
2885
    if (stmt || smart || acreate) {
565✔
2886
        csvFile = openTagCsv(stbInfo);
26✔
2887
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
26✔
2888
    }
2889

2890
    bool oldInitStmt = stbInfo->autoTblCreating || database->superTbls->size > 1;
565✔
2891
    // stmt.  not auto table create call on stmt
2892
    if (stbInfo->iface == STMT_IFACE && !oldInitStmt) {
565✔
2893
        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w)) {
28✔
2894
            g_fail = true;
×
2895
            goto free_of_progressive;
×
2896
        }
2897
    }
2898
    else if (stbInfo->iface == STMT2_IFACE && !stbInfo->autoTblCreating) {
537✔
2899
        if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w)) {
12✔
2900
            g_fail = true;
×
2901
            goto free_of_progressive;
×
2902
        }
2903
    }
2904
    
2905
    //
2906
    // loop write each child table
2907
    //
2908
    for (uint64_t tableSeq = pThreadInfo->start_table_from;
565✔
2909
            tableSeq <= pThreadInfo->end_table_to; tableSeq++) {
12,043✔
2910
        char *sampleDataBuf;
2911
        SChildTable *childTbl;
2912

2913
        if (g_arguments->bind_vgroup) {
11,641✔
2914
            childTbl = pThreadInfo->vg->childTblArray[tableSeq];
×
2915
        } else {
2916
            childTbl = stbInfo->childTblArray[tableSeq];
11,641✔
2917
        }
2918
        debugPrint("tableSeq=%"PRId64" childTbl->name=%s\n", tableSeq, childTbl->name);
11,641✔
2919

2920
        if (childTbl->useOwnSample) {
11,645✔
2921
            sampleDataBuf = childTbl->sampleDataBuf;
11✔
2922
        } else {
2923
            sampleDataBuf = stbInfo->sampleDataBuf;
11,634✔
2924
        }
2925

2926
        int64_t  timestamp = pThreadInfo->start_time;
11,645✔
2927
        uint64_t len = 0;
11,645✔
2928
        int32_t pos = 0;
11,645✔
2929
        int32_t pkCur = 0; // record generate same timestamp current count
11,645✔
2930
        int32_t pkCnt = 0; // record generate same timestamp count
11,645✔
2931
        int64_t delay1 = 0;
11,645✔
2932
        int64_t delay2 = 0;
11,645✔
2933
        int64_t delay3 = 0;
11,645✔
2934

2935
        if(stmt || smart || acreate) {
11,645✔
2936
            // generator
2937
            if (w == 0) {
92✔
2938
                if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL)) {
26✔
2939
                    g_fail = true;
×
2940
                    goto free_of_progressive;
12✔
2941
                }
2942
            }
2943
        }
2944

2945
        // old init stmt must call for each table
2946
        if (stbInfo->iface == STMT_IFACE && oldInitStmt) {
11,645✔
2947
            if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w)) {
47✔
2948
                g_fail = true;
×
2949
                goto free_of_progressive;
×
2950
            }
2951
        }
2952
        else if (stbInfo->iface == STMT2_IFACE && stbInfo->autoTblCreating) {
11,598✔
2953
            if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w)) {
5✔
2954
                g_fail = true;
×
2955
                goto free_of_progressive;
×
2956
            }
2957
        }
2958
        
2959
        if(stmt || smart || acreate) {
11,645✔
2960
            // move next
2961
            if (++w >= TAG_BATCH_COUNT) {
93✔
2962
                // reset for gen again
2963
                w = 0;
×
2964
            } 
2965
        }
2966

2967
        char ttl[SMALL_BUFF_LEN] = "";
11,645✔
2968
        if (stbInfo->ttl != 0) {
11,645✔
2969
            snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
26✔
2970
        }
2971
        for (uint64_t i = 0; i < stbInfo->insertRows;) {
43,822✔
2972
            if (g_arguments->terminate) {
43,973✔
2973
                goto free_of_progressive;
×
2974
            }
2975
            int32_t generated = 0;
43,973✔
2976
            switch (stbInfo->iface) {
43,973✔
2977
                case TAOSC_IFACE:
42,798✔
2978
                case REST_IFACE:
2979
                    generated = prepareProgressDataSql(
42,798✔
2980
                            pThreadInfo,
2981
                            childTbl,
2982
                            tagData,
2983
                            w,
2984
                            sampleDataBuf,
2985
                            &timestamp, i, ttl, &pos, &len, &pkCur, &pkCnt);
2986
                    break;
42,877✔
2987
                case STMT_IFACE: {
225✔
2988
                    generated = prepareProgressDataStmt(
225✔
2989
                            pThreadInfo,
2990
                            childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1, &delay2, &delay3);
2991
                    break;
224✔
2992
                }
2993
                case STMT2_IFACE: {
132✔
2994
                    generated = stmt2BindAndSubmit(
132✔
2995
                            pThreadInfo,
2996
                            childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1,
2997
                            &delay3, &startTs, &endTs, w);
2998
                    break;
132✔
2999
                }
3000
                case SML_REST_IFACE:
742✔
3001
                case SML_IFACE:
3002
                    generated = prepareProgressDataSml(
742✔
3003
                            pThreadInfo,
3004
                            childTbl,
3005
                            tableSeq, &timestamp, i, ttl, &pkCur, &pkCnt);
3006
                    break;
742✔
3007
                default:
76✔
3008
                    break;
76✔
3009
            }
3010
            if (generated < 0) {
44,051✔
3011
                g_fail = true;
×
3012
                goto free_of_progressive;
×
3013
            }
3014
            if (!stbInfo->non_stop) {
44,051✔
3015
                i += generated;
43,927✔
3016
            }
3017

3018
            // stmt2 execInsert already execute on stmt2BindAndSubmit
3019
            if (stbInfo->iface != STMT2_IFACE) {
44,051✔
3020
                // no stmt2 exec
3021
                startTs = toolsGetTimestampUs();
43,801✔
3022
                int code = execInsert(pThreadInfo, generated, &delay3);
43,773✔
3023
                if (code) {
43,855✔
3024
                    if (NO_IF_FAILED == stbInfo->continueIfFail) {
12✔
3025
                        warnPrint("The super table parameter "
12✔
3026
                                "continueIfFail: %d, STOP insertion!\n",
3027
                                stbInfo->continueIfFail);
3028
                        g_fail = true;
12✔
3029
                        goto free_of_progressive;
12✔
3030
                    } else if (YES_IF_FAILED == stbInfo->continueIfFail) {
×
3031
                        infoPrint("The super table parameter "
×
3032
                                "continueIfFail: %d, "
3033
                                "will continue to insert ..\n",
3034
                                stbInfo->continueIfFail);
3035
                    } else if (smart) {
×
3036
                        warnPrint("The super table parameter "
×
3037
                                "continueIfFail: %d, will create table "
3038
                                "then insert ..\n",
3039
                                stbInfo->continueIfFail);
3040

3041
                        // generator
3042
                        if (w == 0) {
×
3043
                            if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL)) {
×
3044
                                g_fail = true;
×
3045
                                goto free_of_progressive;
×
3046
                            }
3047
                        }
3048

3049
                        code = smartContinueIfFail(
×
3050
                                pThreadInfo,
3051
                                childTbl, tagData, w, ttl);
3052
                        if (0 != code) {
×
3053
                            g_fail = true;
×
3054
                            goto free_of_progressive;
×
3055
                        }
3056

3057
                        // move next
3058
                        if (++w >= TAG_BATCH_COUNT) {
×
3059
                            // reset for gen again
3060
                            w = 0;
×
3061
                        }
3062

3063
                        code = execInsert(pThreadInfo, generated, &delay3);
×
3064
                        if (code) {
×
3065
                            g_fail = true;
×
3066
                            goto free_of_progressive;
×
3067
                        }
3068
                    } else {
3069
                        warnPrint("Unknown super table parameter "
×
3070
                                "continueIfFail: %d\n",
3071
                                stbInfo->continueIfFail);
3072
                        g_fail = true;
×
3073
                        goto free_of_progressive;
×
3074
                    }
3075
                }
3076
                endTs = toolsGetTimestampUs() + 1;
43,843✔
3077
            }
3078

3079
            if (stbInfo->insert_interval > 0) {
44,101✔
3080
                debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
30✔
3081
                          __func__, __LINE__, stbInfo->insert_interval);
3082
                perfPrint("sleep %" PRIu64 " ms\n",
30✔
3083
                              stbInfo->insert_interval);
3084
                toolsMsleep((int32_t)stbInfo->insert_interval);
30✔
3085
            }
3086

3087
            // flush
3088
            if (database->flush) {
43,919✔
3089
                char sql[260] = "";
×
3090
                sprintf(sql, "flush database %s", database->dbName);
×
3091
                int32_t code = executeSql(pThreadInfo->conn->taos,sql);
×
3092
                if (code != 0) {
×
3093
                  perfPrint(" %s failed. error code = 0x%x\n", sql, code);
×
3094
                } else {
3095
                   perfPrint(" %s ok.\n", sql);
×
3096
                }
3097
            }
3098

3099
            pThreadInfo->totalInsertRows += generated;
43,919✔
3100

3101
            if (g_arguments->terminate) {
43,919✔
3102
                goto free_of_progressive;
×
3103
            }
3104
            int protocol = stbInfo->lineProtocol;
43,919✔
3105
            switch (stbInfo->iface) {
43,919✔
3106
                case REST_IFACE:
42,843✔
3107
                case TAOSC_IFACE:
3108
                    memset(pThreadInfo->buffer, 0, pThreadInfo->max_sql_len);
42,843✔
3109
                    break;
42,843✔
3110
                case SML_REST_IFACE:
69✔
3111
                    memset(pThreadInfo->buffer, 0,
69✔
3112
                           g_arguments->reqPerReq *
69✔
3113
                               (pThreadInfo->max_sql_len + 1));
69✔
3114
                case SML_IFACE:
741✔
3115
                    if (TSDB_SML_JSON_PROTOCOL == protocol) {
741✔
3116
                        memset(pThreadInfo->lines[0], 0,
290✔
3117
                           pThreadInfo->line_buf_len);
290✔
3118
                    } else if (SML_JSON_TAOS_FORMAT == protocol) {
451✔
3119
                        if (pThreadInfo->lines && pThreadInfo->lines[0]) {
147✔
3120
                            tmfree(pThreadInfo->lines[0]);
147✔
3121
                            pThreadInfo->lines[0] = NULL;
147✔
3122
                        }
3123
                        if (pThreadInfo->json_array) {
147✔
3124
                            tools_cJSON_Delete(pThreadInfo->json_array);
147✔
3125
                            pThreadInfo->json_array = NULL;
147✔
3126
                        }
3127
                        pThreadInfo->json_array = tools_cJSON_CreateArray();
147✔
3128
                    } else {
3129
                        for (int j = 0; j < generated; j++) {
103,137✔
3130
                            debugPrint("pThreadInfo->lines[%d]: %s\n",
102,851✔
3131
                                       j, pThreadInfo->lines[j]);
3132
                            memset(pThreadInfo->lines[j], 0,
102,833✔
3133
                                   pThreadInfo->max_sql_len);
3134
                        }
3135
                    }
3136
                    break;
723✔
3137
                case STMT_IFACE:
225✔
3138
                    break;
225✔
3139
            }
3140

3141
            int64_t delay4 = endTs - startTs;
43,901✔
3142
            int64_t delay = delay1 + delay2 + delay3 + delay4;
43,901✔
3143
            if (delay <= 0) {
43,901✔
3144
                debugPrint("thread[%d]: startTs: %"PRId64", endTs: %"PRId64"\n",
×
3145
                        pThreadInfo->threadID, startTs, endTs);
3146
            } else {
3147
                perfPrint("insert execution time is %.6f s\n",
43,901✔
3148
                              delay / 1E6);
3149

3150
                int64_t * pDelay = benchCalloc(1, sizeof(int64_t), false);
43,901✔
3151
                *pDelay = delay;
43,997✔
3152
                if (benchArrayPush(pThreadInfo->delayList, pDelay) == NULL) {
43,997✔
3153
                    tmfree(pDelay);
×
3154
                }
3155
                pThreadInfo->totalDelay += delay;
43,888✔
3156
                pThreadInfo->totalDelay1 += delay1;
43,888✔
3157
                pThreadInfo->totalDelay2 += delay2;
43,888✔
3158
                pThreadInfo->totalDelay3 += delay3;
43,888✔
3159
            }
3160
            delay1 = delay2 = delay3 = 0;
43,888✔
3161

3162
            int64_t currentPrintTime = toolsGetTimestampMs();
43,888✔
3163
            if (currentPrintTime - lastPrintTime > 30 * 1000) {
43,916✔
3164
                infoPrint(
88✔
3165
                        "thread[%d] has currently inserted rows: "
3166
                        "%" PRId64 ", peroid insert rate: %.3f rows/s \n",
3167
                        pThreadInfo->threadID, pThreadInfo->totalInsertRows,
3168
                        (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
UNCOV
3169
                lastPrintTime = currentPrintTime;
×
UNCOV
3170
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
×
3171
            }
3172
            if (i >= stbInfo->insertRows) {
43,806✔
3173
                break;
11,629✔
3174
            }
3175
        }  // insertRows
3176
    }      // tableSeq
3177
free_of_progressive:
402✔
3178
    cleanupAndPrint(pThreadInfo, "progressive");
414✔
3179
    if(csvFile) {
565✔
3180
        fclose(csvFile);
×
3181
    }
3182
    tmfree(tagData);
565✔
3183
    return NULL;
565✔
3184
}
3185

3186
uint64_t strToTimestamp(char * tsStr) {
10,130✔
3187
    uint64_t ts = 0;
10,130✔
3188
    // remove double quota mark
3189
    if (tsStr[0] == '\"' || tsStr[0] == '\'') {
10,130✔
3190
        tsStr += 1;
×
3191
        int32_t last = strlen(tsStr) - 1;
×
3192
        if (tsStr[last] == '\"' || tsStr[0] == '\'') {
×
3193
            tsStr[last] = 0;
×
3194
        }
3195
    }
3196

3197
    if (toolsParseTime(tsStr, (int64_t*)&ts, strlen(tsStr), TSDB_TIME_PRECISION_MILLI, 0)) {
10,130✔
3198
        // not timestamp str format, maybe int64 format
3199
        ts = (int64_t)atol(tsStr);
10,130✔
3200
    }
3201

3202
    return ts;
10,130✔
3203
}
3204

3205
static int initStmtDataValue(SSuperTable *stbInfo, SChildTable *childTbl, uint64_t *bind_ts_array) {
103✔
3206
    int32_t columnCount = stbInfo->cols->size;
103✔
3207

3208
    char *sampleDataBuf;
3209
    if (childTbl) {
103✔
3210
        sampleDataBuf = childTbl->sampleDataBuf;
28✔
3211
    } else {
3212
        sampleDataBuf = stbInfo->sampleDataBuf;
75✔
3213
    }
3214
    int64_t lenOfOneRow = stbInfo->lenOfCols;
103✔
3215

3216
    if (stbInfo->useSampleTs) {
103✔
3217
        columnCount += 1;  // for skipping first column
14✔
3218
    }
3219
    for (int i=0; i < g_arguments->prepared_rand; i++) {
853,135✔
3220
        int cursor = 0;
853,032✔
3221

3222
        for (int c = 0; c < columnCount; c++) {
8,226,498✔
3223
            char *restStr = sampleDataBuf
7,373,466✔
3224
                + lenOfOneRow * i + cursor;
7,373,466✔
3225
            int lengthOfRest = strlen(restStr);
7,373,466✔
3226

3227
            int index = 0;
7,373,466✔
3228
            for (index = 0; index < lengthOfRest; index++) {
175,678,909✔
3229
                if (restStr[index] == ',') {
174,820,249✔
3230
                    break;
6,514,806✔
3231
                }
3232
            }
3233

3234
            cursor += index + 1;  // skip ',' too
7,373,466✔
3235

3236
            char *tmpStr = calloc(1, index + 1);
7,373,466✔
3237
            if (NULL == tmpStr) {
7,373,466✔
3238
                errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
×
3239
                        __func__, __LINE__, index + 1);
3240
                return -1;
×
3241
            }
3242

3243
            strncpy(tmpStr, restStr, index);
7,373,466✔
3244
            if ((0 == c) && stbInfo->useSampleTs) {
7,373,466✔
3245
                // set ts to 
3246
                bind_ts_array[i] = strToTimestamp(tmpStr); 
10,130✔
3247
                free(tmpStr);
10,130✔
3248
                continue;
10,130✔
3249
            }
3250

3251
            Field *col = benchArrayGet(stbInfo->cols,
7,363,336✔
3252
                    (stbInfo->useSampleTs?c-1:c));
7,363,336✔
3253
            char dataType = col->type;
7,363,336✔
3254

3255
            StmtData *stmtData;
3256
            if (childTbl) {
7,363,336✔
3257
                ChildField *childCol =
3258
                    benchArrayGet(childTbl->childCols,
33,764✔
3259
                                  (stbInfo->useSampleTs?c-1:c));
33,764✔
3260
                stmtData = &childCol->stmtData;
33,764✔
3261
            } else {
3262
                stmtData = &col->stmtData;
7,329,572✔
3263
            }
3264

3265
            // set value
3266
            stmtData->is_null[i] = 0;
7,363,336✔
3267
            stmtData->lengths[i] = col->length;
7,363,336✔
3268

3269
            if (0 == strcmp(tmpStr, "NULL")) {
7,363,336✔
3270
                *(stmtData->is_null + i) = true;
28✔
3271
            } else {
3272
                switch (dataType) {
7,363,308✔
3273
                    case TSDB_DATA_TYPE_INT:
875,854✔
3274
                    case TSDB_DATA_TYPE_UINT:
3275
                        *((int32_t*)stmtData->data + i) = atoi(tmpStr);
875,854✔
3276
                        break;
875,854✔
3277
                    case TSDB_DATA_TYPE_FLOAT:
1,068,934✔
3278
                        *((float*)stmtData->data +i) = (float)atof(tmpStr);
1,068,934✔
3279
                        break;
1,068,934✔
3280
                    case TSDB_DATA_TYPE_DOUBLE:
2,650,850✔
3281
                        *((double*)stmtData->data + i) = atof(tmpStr);
2,650,850✔
3282
                        break;
2,650,850✔
3283
                    case TSDB_DATA_TYPE_TINYINT:
81,700✔
3284
                    case TSDB_DATA_TYPE_UTINYINT:
3285
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
81,700✔
3286
                        break;
81,700✔
3287
                    case TSDB_DATA_TYPE_SMALLINT:
81,700✔
3288
                    case TSDB_DATA_TYPE_USMALLINT:
3289
                        *((int16_t*)stmtData->data + i) = (int16_t)atoi(tmpStr);
81,700✔
3290
                        break;
81,700✔
3291
                    case TSDB_DATA_TYPE_BIGINT:
81,700✔
3292
                    case TSDB_DATA_TYPE_UBIGINT:
3293
                        *((int64_t*)stmtData->data + i) = (int64_t)atol(tmpStr);
81,700✔
3294
                        break;
81,700✔
3295
                    case TSDB_DATA_TYPE_BOOL:
40,850✔
3296
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
40,850✔
3297
                        break;
40,850✔
3298
                    case TSDB_DATA_TYPE_TIMESTAMP:
48,040✔
3299
                        *((int64_t*)stmtData->data + i) = (int64_t)atol(tmpStr);
48,040✔
3300
                        break;
48,040✔
3301
                    case TSDB_DATA_TYPE_BINARY:
2,433,680✔
3302
                    case TSDB_DATA_TYPE_NCHAR:
3303
                    case TSDB_DATA_TYPE_VARBINARY:
3304
                    case TSDB_DATA_TYPE_GEOMETRY:
3305
                        {
3306
                            size_t tmpLen = strlen(tmpStr);
2,433,680✔
3307
                            debugPrint("%s() LN%d, index: %d, "
2,433,680✔
3308
                                    "tmpStr len: %"PRIu64", col->length: %d\n",
3309
                                    __func__, __LINE__,
3310
                                    i, (uint64_t)tmpLen, col->length);
3311
                            if (tmpLen-2 > col->length) {
2,433,680✔
3312
                                errorPrint("data length %"PRIu64" "
×
3313
                                        "is larger than column length %d\n",
3314
                                        (uint64_t)tmpLen, col->length);
3315
                            }
3316
                            if (tmpLen > 2) {
2,433,680✔
3317
                                strncpy((char *)stmtData->data
2,334,565✔
3318
                                            + i * col->length,
2,334,565✔
3319
                                        tmpStr+1,
2,334,565✔
3320
                                        min(col->length, tmpLen - 2));
2,334,565✔
3321
                            } else {
3322
                                strncpy((char *)stmtData->data
99,115✔
3323
                                            + i*col->length,
99,115✔
3324
                                        "", 1);
3325
                            }
3326
                        }
3327
                        break;
2,433,680✔
3328
                    default:
×
3329
                        break;
×
3330
                }
3331
            }
3332
            free(tmpStr);
7,363,336✔
3333
        }
3334
    }
3335
    return 0;
103✔
3336
}
3337

3338
static void initStmtData(char dataType, void **data, uint32_t length) {
648✔
3339
    char *tmpP = NULL;
648✔
3340

3341
    switch (dataType) {
648✔
3342
        case TSDB_DATA_TYPE_INT:
109✔
3343
        case TSDB_DATA_TYPE_UINT:
3344
            tmpP = calloc(1, sizeof(int) * g_arguments->prepared_rand);
109✔
3345
            assert(tmpP);
109✔
3346
            tmfree(*data);
109✔
3347
            *data = (void*)tmpP;
109✔
3348
            break;
109✔
3349

3350
        case TSDB_DATA_TYPE_TINYINT:
30✔
3351
        case TSDB_DATA_TYPE_UTINYINT:
3352
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
30✔
3353
            assert(tmpP);
30✔
3354
            tmfree(*data);
30✔
3355
            *data = (void*)tmpP;
30✔
3356
            break;
30✔
3357

3358
        case TSDB_DATA_TYPE_SMALLINT:
30✔
3359
        case TSDB_DATA_TYPE_USMALLINT:
3360
            tmpP = calloc(1, sizeof(int16_t) * g_arguments->prepared_rand);
30✔
3361
            assert(tmpP);
30✔
3362
            tmfree(*data);
30✔
3363
            *data = (void*)tmpP;
30✔
3364
            break;
30✔
3365

3366
        case TSDB_DATA_TYPE_BIGINT:
30✔
3367
        case TSDB_DATA_TYPE_UBIGINT:
3368
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
30✔
3369
            assert(tmpP);
30✔
3370
            tmfree(*data);
30✔
3371
            *data = (void*)tmpP;
30✔
3372
            break;
30✔
3373

3374
        case TSDB_DATA_TYPE_BOOL:
15✔
3375
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
15✔
3376
            assert(tmpP);
15✔
3377
            tmfree(*data);
15✔
3378
            *data = (void*)tmpP;
15✔
3379
            break;
15✔
3380

3381
        case TSDB_DATA_TYPE_FLOAT:
117✔
3382
            tmpP = calloc(1, sizeof(float) * g_arguments->prepared_rand);
117✔
3383
            assert(tmpP);
117✔
3384
            tmfree(*data);
117✔
3385
            *data = (void*)tmpP;
117✔
3386
            break;
117✔
3387

3388
        case TSDB_DATA_TYPE_DOUBLE:
146✔
3389
            tmpP = calloc(1, sizeof(double) * g_arguments->prepared_rand);
146✔
3390
            assert(tmpP);
146✔
3391
            tmfree(*data);
146✔
3392
            *data = (void*)tmpP;
146✔
3393
            break;
146✔
3394

3395
        case TSDB_DATA_TYPE_BINARY:
157✔
3396
        case TSDB_DATA_TYPE_NCHAR:
3397
        case TSDB_DATA_TYPE_VARBINARY:
3398
        case TSDB_DATA_TYPE_GEOMETRY:
3399
            tmpP = calloc(1, g_arguments->prepared_rand * length);
157✔
3400
            assert(tmpP);
157✔
3401
            tmfree(*data);
157✔
3402
            *data = (void*)tmpP;
157✔
3403
            break;
157✔
3404

3405
        case TSDB_DATA_TYPE_TIMESTAMP:
14✔
3406
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
14✔
3407
            assert(tmpP);
14✔
3408
            tmfree(*data);
14✔
3409
            *data = (void*)tmpP;
14✔
3410
            break;
14✔
3411

3412
        default:
×
3413
            errorPrint("Unknown data type on initStmtData: %s\n",
×
3414
                       convertDatatypeToString(dataType));
3415
            exit(EXIT_FAILURE);
×
3416
    }
3417
}
648✔
3418

3419
static int parseBufferToStmtBatchChildTbl(SSuperTable *stbInfo,
28✔
3420
                                          SChildTable* childTbl, uint64_t *bind_ts_array) {
3421
    int32_t columnCount = stbInfo->cols->size;
28✔
3422

3423
    for (int c = 0; c < columnCount; c++) {
96✔
3424
        Field *col = benchArrayGet(stbInfo->cols, c);
68✔
3425
        ChildField *childCol = benchArrayGet(childTbl->childCols, c);
68✔
3426
        char dataType = col->type;
68✔
3427

3428
        // malloc memory
3429
        tmfree(childCol->stmtData.is_null);
68✔
3430
        tmfree(childCol->stmtData.lengths);
68✔
3431
        childCol->stmtData.is_null = benchCalloc(sizeof(char),     g_arguments->prepared_rand, true);
68✔
3432
        childCol->stmtData.lengths = benchCalloc(sizeof(int32_t),  g_arguments->prepared_rand, true);
68✔
3433

3434
        initStmtData(dataType, &(childCol->stmtData.data), col->length);
68✔
3435
    }
3436

3437
    return initStmtDataValue(stbInfo, childTbl, bind_ts_array);
28✔
3438
}
3439

3440
static int parseBufferToStmtBatch(SSuperTable* stbInfo, uint64_t *bind_ts_array) {
75✔
3441
    int32_t columnCount = stbInfo->cols->size;
75✔
3442

3443
    for (int c = 0; c < columnCount; c++) {
655✔
3444
        Field *col = benchArrayGet(stbInfo->cols, c);
580✔
3445

3446
        //remalloc element count is g_arguments->prepared_rand buffer
3447
        tmfree(col->stmtData.is_null);
580✔
3448
        col->stmtData.is_null = benchCalloc(sizeof(char), g_arguments->prepared_rand, false);
580✔
3449
        tmfree(col->stmtData.lengths);
580✔
3450
        col->stmtData.lengths = benchCalloc(sizeof(int32_t), g_arguments->prepared_rand, false);
580✔
3451

3452
        initStmtData(col->type, &(col->stmtData.data), col->length);
580✔
3453
    }
3454

3455
    return initStmtDataValue(stbInfo, NULL, bind_ts_array);
75✔
3456
}
3457

3458
static int64_t fillChildTblNameByCount(SSuperTable *stbInfo) {
218✔
3459
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
32,138✔
3460
        char childName[TSDB_TABLE_NAME_LEN]={0};
31,920✔
3461
        snprintf(childName,
31,920✔
3462
                 TSDB_TABLE_NAME_LEN,
3463
                 "%s%" PRIu64 "",
3464
                 stbInfo->childTblPrefix, i);
3465
        stbInfo->childTblArray[i]->name = strdup(childName);
31,920✔
3466
        debugPrint("%s(): %s\n", __func__,
31,920✔
3467
                  stbInfo->childTblArray[i]->name);
3468
    }
3469

3470
    return stbInfo->childTblCount;
218✔
3471
}
3472

3473
static int64_t fillChildTblNameByFromTo(SDataBase *database,
3✔
3474
        SSuperTable* stbInfo) {
3475
    for (int64_t i = stbInfo->childTblFrom; i <= stbInfo->childTblTo; i++) {
13✔
3476
        char childName[TSDB_TABLE_NAME_LEN]={0};
10✔
3477
        snprintf(childName,
10✔
3478
                TSDB_TABLE_NAME_LEN,
3479
                "%s%" PRIu64 "",
3480
                stbInfo->childTblPrefix, i);
3481
        stbInfo->childTblArray[i]->name = strdup(childName);
10✔
3482
    }
3483

3484
    return (stbInfo->childTblTo-stbInfo->childTblFrom);
3✔
3485
}
3486

3487
static int64_t fillChildTblNameByLimitOffset(SDataBase *database,
5✔
3488
        SSuperTable* stbInfo) {
3489
    SBenchConn* conn = initBenchConn();
5✔
3490
    if (NULL == conn) {
5✔
3491
        return -1;
×
3492
    }
3493
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
5✔
3494
    if (g_arguments->taosc_version == 3) {
5✔
3495
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
5✔
3496
                 "SELECT DISTINCT(TBNAME) FROM %s.`%s` LIMIT %" PRId64
3497
                 " OFFSET %" PRIu64 "",
3498
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
3499
                 stbInfo->childTblOffset);
3500
    } else {
3501
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
3502
                 "SELECT TBNAME FROM %s.`%s` LIMIT %" PRId64
3503
                 " OFFSET %" PRIu64 "",
3504
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
3505
                 stbInfo->childTblOffset);
3506
    }
3507
    debugPrint("cmd: %s\n", cmd);
5✔
3508
    TAOS_RES *res = taos_query(conn->taos, cmd);
5✔
3509
    int32_t   code = taos_errno(res);
5✔
3510
    int64_t   count = 0;
5✔
3511
    if (code) {
5✔
3512
        printErrCmdCodeStr(cmd, code, res);
3✔
3513
        closeBenchConn(conn);
3✔
3514
        return -1;
3✔
3515
    }
3516
    TAOS_ROW row = NULL;
2✔
3517
    while ((row = taos_fetch_row(res)) != NULL) {
6✔
3518
        int *lengths = taos_fetch_lengths(res);
4✔
3519
        char * childName = benchCalloc(1, lengths[0] + 1, true);
4✔
3520
        strncpy(childName, row[0], lengths[0]);
4✔
3521
        childName[lengths[0]] = '\0';
4✔
3522
        stbInfo->childTblArray[count]->name = childName;
4✔
3523
        debugPrint("stbInfo->childTblArray[%" PRId64 "]->name: %s\n",
4✔
3524
                   count, stbInfo->childTblArray[count]->name);
3525
        count++;
4✔
3526
    }
3527
    taos_free_result(res);
2✔
3528
    closeBenchConn(conn);
2✔
3529
    return count;
2✔
3530
}
3531

3532
static void preProcessArgument(SSuperTable *stbInfo) {
221✔
3533
    if (stbInfo->interlaceRows > g_arguments->reqPerReq) {
221✔
3534
        infoPrint(
7✔
3535
            "interlaceRows(%d) is larger than record per request(%u), which "
3536
            "will be set to %u\n",
3537
            stbInfo->interlaceRows, g_arguments->reqPerReq,
3538
            g_arguments->reqPerReq);
3539
        stbInfo->interlaceRows = g_arguments->reqPerReq;
7✔
3540
    }
3541

3542
    if (stbInfo->interlaceRows > stbInfo->insertRows) {
221✔
3543
        infoPrint(
1✔
3544
                "interlaceRows larger than insertRows %d > %" PRId64 "\n",
3545
                stbInfo->interlaceRows, stbInfo->insertRows);
3546
        infoPrint("%s", "interlaceRows will be set to 0\n");
1✔
3547
        stbInfo->interlaceRows = 0;
1✔
3548
    }
3549

3550
    if (stbInfo->interlaceRows == 0
221✔
3551
            && g_arguments->reqPerReq > stbInfo->insertRows) {
192✔
3552
        infoPrint("record per request (%u) is larger than "
85✔
3553
                "insert rows (%"PRIu64")"
3554
                " in progressive mode, which will be set to %"PRIu64"\n",
3555
                g_arguments->reqPerReq, stbInfo->insertRows,
3556
                stbInfo->insertRows);
3557
        g_arguments->reqPerReq = stbInfo->insertRows;
85✔
3558
    }
3559

3560
    if (stbInfo->interlaceRows > 0 && stbInfo->iface == STMT_IFACE
221✔
3561
            && stbInfo->autoTblCreating) {
8✔
3562
        errorPrint("%s","stmt not support autocreate table with interlace row , quit programe!\n");
×
3563
        exit(-1);
×
3564
    }
3565
}
221✔
3566

3567
static int printTotalDelay(SDataBase *database,
221✔
3568
                           int64_t totalDelay,
3569
                           int64_t totalDelay1,
3570
                           int64_t totalDelay2,
3571
                           int64_t totalDelay3,
3572
                           BArray *total_delay_list,
3573
                            int threads,
3574
                            int64_t totalInsertRows,
3575
                            int64_t spend) {
3576
    // zero check
3577
    if (total_delay_list->size == 0 || spend == 0 || threads == 0) {
221✔
3578
        return -1;
1✔
3579
    }
3580

3581
    char subDelay[128] = "";
220✔
3582
    if(totalDelay1 + totalDelay2 + totalDelay3 > 0) {
220✔
3583
        sprintf(subDelay, " stmt delay1=%.2fs delay2=%.2fs delay3=%.2fs",
26✔
3584
                totalDelay1/threads/1E6,
26✔
3585
                totalDelay2/threads/1E6,
26✔
3586
                totalDelay3/threads/1E6);
26✔
3587
    }
3588

3589
    succPrint("Spent %.6f (real %.6f) seconds to insert rows: %" PRIu64
220✔
3590
              " with %d thread(s) into %s %.2f (real %.2f) records/second%s\n",
3591
              spend/1E6, totalDelay/threads/1E6, totalInsertRows, threads,
3592
              database->dbName,
3593
              (double)(totalInsertRows / (spend/1E6)),
3594
              (double)(totalInsertRows / (totalDelay/threads/1E6)), subDelay);
3595
    if (!total_delay_list->size) {
220✔
3596
        return -1;
×
3597
    }
3598

3599
    succPrint("insert delay, "
220✔
3600
              "min: %.4fms, "
3601
              "avg: %.4fms, "
3602
              "p90: %.4fms, "
3603
              "p95: %.4fms, "
3604
              "p99: %.4fms, "
3605
              "max: %.4fms\n",
3606
              *(int64_t *)(benchArrayGet(total_delay_list, 0))/1E3,
3607
              (double)totalDelay/total_delay_list->size/1E3,
3608
              *(int64_t *)(benchArrayGet(total_delay_list,
3609
                                         (int32_t)(total_delay_list->size
3610
                                         * 0.9)))/1E3,
3611
              *(int64_t *)(benchArrayGet(total_delay_list,
3612
                                         (int32_t)(total_delay_list->size
3613
                                         * 0.95)))/1E3,
3614
              *(int64_t *)(benchArrayGet(total_delay_list,
3615
                                         (int32_t)(total_delay_list->size
3616
                                         * 0.99)))/1E3,
3617
              *(int64_t *)(benchArrayGet(total_delay_list,
3618
                                         (int32_t)(total_delay_list->size
3619
                                         - 1)))/1E3);
3620
    return 0;
220✔
3621
}
3622

3623
static int64_t fillChildTblNameImp(SDataBase *database, SSuperTable *stbInfo) {
9✔
3624
    int64_t ntables;
3625
    if (stbInfo->childTblLimit) {
9✔
3626
        ntables = fillChildTblNameByLimitOffset(database, stbInfo);
5✔
3627
    } else if (stbInfo->childTblFrom || stbInfo->childTblTo) {
4✔
3628
        ntables = fillChildTblNameByFromTo(database, stbInfo);
3✔
3629
    } else {
3630
        ntables = fillChildTblNameByCount(stbInfo);
1✔
3631
    }
3632
    return ntables;
9✔
3633
}
3634

3635
static int64_t fillChildTblName(SDataBase *database, SSuperTable *stbInfo) {
229✔
3636
    int64_t ntables = stbInfo->childTblCount;
229✔
3637
    stbInfo->childTblArray = benchCalloc(stbInfo->childTblCount,
229✔
3638
            sizeof(SChildTable*), true);
3639
    for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
32,244✔
3640
        stbInfo->childTblArray[child] =
32,015✔
3641
            benchCalloc(1, sizeof(SChildTable), true);
32,015✔
3642
    }
3643

3644
    if (stbInfo->childTblCount == 1 && stbInfo->tags->size == 0) {
229✔
3645
        // Normal table
3646
        char childName[TSDB_TABLE_NAME_LEN]={0};
3✔
3647
        snprintf(childName, TSDB_TABLE_NAME_LEN,
3✔
3648
                    "%s", stbInfo->stbName);
3649
        stbInfo->childTblArray[0]->name = strdup(childName);
3✔
3650
    } else if ((stbInfo->iface != SML_IFACE
226✔
3651
                && stbInfo->iface != SML_REST_IFACE)
168✔
3652
            && stbInfo->childTblExists) {
154✔
3653
        ntables = fillChildTblNameImp(database, stbInfo);
9✔
3654
    } else {
3655
        ntables = fillChildTblNameByCount(stbInfo);
217✔
3656
    }
3657

3658
    return ntables;
229✔
3659
}
3660

3661
// last ts fill to filllBackTime
3662
static bool fillSTableLastTs(SDataBase *database, SSuperTable *stbInfo) {
×
3663
    SBenchConn* conn = initBenchConn();
×
3664
    if (NULL == conn) {
×
3665
        return false;
×
3666
    }
3667
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
3668
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "select last(ts) from %s.`%s`", database->dbName, stbInfo->stbName);
×
3669

3670
    infoPrint("fillBackTime: %s\n", cmd);
×
3671
    TAOS_RES *res = taos_query(conn->taos, cmd);
×
3672
    int32_t   code = taos_errno(res);
×
3673
    if (code) {
×
3674
        printErrCmdCodeStr(cmd, code, res);
×
3675
        closeBenchConn(conn);
×
3676
        return false;
×
3677
    }
3678

3679
    TAOS_ROW row = taos_fetch_row(res);
×
3680
    if(row == NULL) {
×
3681
        taos_free_result(res);
×
3682
        closeBenchConn(conn);
×
3683
        return false;
×
3684
    }
3685
    
3686
    char lastTs[128];
3687
    memset(lastTs, 0, sizeof(lastTs));
×
3688

3689
    stbInfo->startFillbackTime = *(int64_t*)row[0];
×
3690
    toolsFormatTimestamp(lastTs, stbInfo->startFillbackTime, database->precision);
×
3691
    infoPrint("fillBackTime: get ok %s.%s last ts=%s \n", database->dbName, stbInfo->stbName, lastTs);
×
3692
    
3693
    taos_free_result(res);
×
3694
    closeBenchConn(conn);
×
3695

3696
    return true;
×
3697
}
3698

3699
// calcNow expression fill to timestamp_start
3700
static bool calcExprFromServer(SDataBase *database, SSuperTable *stbInfo) {
×
3701
    SBenchConn* conn = initBenchConn();
×
3702
    if (NULL == conn) {
×
3703
        return false;
×
3704
    }
3705
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
3706
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "select %s", stbInfo->calcNow);
×
3707

3708
    infoPrint("calcExprFromServer: %s\n", cmd);
×
3709
    TAOS_RES *res = taos_query(conn->taos, cmd);
×
3710
    int32_t   code = taos_errno(res);
×
3711
    if (code) {
×
3712
        printErrCmdCodeStr(cmd, code, res);
×
3713
        closeBenchConn(conn);
×
3714
        return false;
×
3715
    }
3716

3717
    TAOS_ROW row = taos_fetch_row(res);
×
3718
    if(row == NULL) {
×
3719
        taos_free_result(res);
×
3720
        closeBenchConn(conn);
×
3721
        return false;
×
3722
    }
3723
    
3724
    char ts[128];
3725
    memset(ts, 0, sizeof(ts));
×
3726

3727
    stbInfo->startTimestamp = *(int64_t*)row[0];
×
3728
    toolsFormatTimestamp(ts, stbInfo->startTimestamp, database->precision);
×
3729
    infoPrint("calcExprFromServer: get ok.  %s = %s \n", stbInfo->calcNow, ts);
×
3730
    
3731
    taos_free_result(res);
×
3732
    closeBenchConn(conn);
×
3733

3734
    return true;
×
3735
}
3736

3737
int64_t obtainTableCount(SDataBase* database, SSuperTable* stbInfo) {
221✔
3738
    // ntable calc
3739
    int64_t ntables;
3740
    if (stbInfo->childTblTo > 0) {
221✔
3741
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
4✔
3742
    } else if (stbInfo->childTblLimit > 0 && stbInfo->childTblExists) {
217✔
3743
        ntables = stbInfo->childTblLimit;
3✔
3744
    } else {
3745
        ntables = stbInfo->childTblCount;
214✔
3746
    }
3747

3748
    return ntables;
221✔
3749
}
3750

3751
// assign table to thread with vgroups, return assign thread count
3752
int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) {
4✔
3753
    SBenchConn* conn = initBenchConn();
4✔
3754
    if (NULL == conn) {
4✔
3755
        return 0;
×
3756
    }
3757
    int32_t threads = 0;
4✔
3758

3759
    // calc table count per vgroup
3760
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
151✔
3761
        int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
147✔
3762
        if (vgIdx == -1) {
147✔
3763
            continue;
×
3764
        }
3765
        SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
147✔
3766
        vg->tbCountPerVgId ++;
147✔
3767
    }
3768

3769
    // malloc vg->childTblArray memory with table count
3770
    for (int v = 0; v < database->vgroups; v++) {
18✔
3771
        SVGroup *vg = benchArrayGet(database->vgArray, v);
14✔
3772
        infoPrint("Local hash calc %"PRId64" tables on %s's vgroup %d (id: %d)\n",
14✔
3773
                    vg->tbCountPerVgId, database->dbName, v, vg->vgId);
3774
        if (vg->tbCountPerVgId) {
14✔
3775
            threads++;
14✔
3776
        } else {
3777
            continue;
×
3778
        }
3779
        vg->childTblArray = benchCalloc(vg->tbCountPerVgId, sizeof(SChildTable*), true);
14✔
3780
        vg->tbOffset      = 0;
14✔
3781
    }
3782
    
3783
    // set vg->childTblArray data
3784
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
151✔
3785
        int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
147✔
3786
        if (vgIdx == -1) {
147✔
3787
            continue;
×
3788
        }
3789
        SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
147✔
3790
        debugPrint("calc table hash to vgroup %s.%s vgIdx=%d\n",
147✔
3791
                    database->dbName,
3792
                    stbInfo->childTblArray[i]->name, vgIdx);
3793
        vg->childTblArray[vg->tbOffset] = stbInfo->childTblArray[i];
147✔
3794
        vg->tbOffset++;
147✔
3795
    }
3796
    closeBenchConn(conn);
4✔
3797
    return threads;
4✔
3798
}
3799

3800
// init stmt
3801
TAOS_STMT* initStmt(TAOS* taos, bool single) {
59✔
3802
    if (!single) {
59✔
3803
        infoPrint("initStmt call taos_stmt_init single=%d\n", single);
18✔
3804
        return taos_stmt_init(taos);
18✔
3805
    }
3806

3807
    TAOS_STMT_OPTIONS op;
3808
    memset(&op, 0, sizeof(op));
41✔
3809
    op.singleStbInsert      = single;
41✔
3810
    op.singleTableBindOnce  = single;
41✔
3811
    infoPrint("initStmt call taos_stmt_init_with_options single=%d\n", single);
41✔
3812
    return taos_stmt_init_with_options(taos, &op);
41✔
3813
}
3814

3815
// init stmt2
3816
TAOS_STMT2* initStmt2(TAOS* taos, bool single) {
16✔
3817
    TAOS_STMT2_OPTION op2;
3818
    memset(&op2, 0, sizeof(op2));
16✔
3819
    op2.singleStbInsert      = single;
16✔
3820
    op2.singleTableBindOnce  = single;
16✔
3821
    
3822
    TAOS_STMT2* stmt2 = taos_stmt2_init(taos, &op2);
16✔
3823
    if (stmt2) 
16✔
3824
        succPrint("succ  taos_stmt2_init single=%d\n", single);
16✔
3825
    else
3826
        errorPrint("failed taos_stmt2_init single=%d\n", single);
×
3827
    return stmt2;
16✔
3828
}
3829

3830
// init insert thread
3831
int32_t initInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, int64_t div, int64_t mod) {
221✔
3832
    int32_t  ret     = -1;
221✔
3833
    uint64_t tbNext  = stbInfo->childTblFrom;
221✔
3834
    int32_t  vgNext  = 0;
221✔
3835
    FILE*    csvFile = NULL;
221✔
3836
    char*    tagData = NULL;
221✔
3837
    bool     stmtN   = (stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating == false;
221✔
3838

3839
    if (stmtN) {
221✔
3840
        csvFile = openTagCsv(stbInfo);
23✔
3841
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
23✔
3842
    }
3843
    
3844
    for (int32_t i = 0; i < nthreads; i++) {
878✔
3845
        // set table
3846
        threadInfo *pThreadInfo = infos + i;
657✔
3847
        pThreadInfo->threadID   = i;
657✔
3848
        pThreadInfo->dbInfo     = database;
657✔
3849
        pThreadInfo->stbInfo    = stbInfo;
657✔
3850
        pThreadInfo->start_time = stbInfo->startTimestamp;
657✔
3851
        pThreadInfo->pos        = 0;
657✔
3852
        pThreadInfo->samplePos  = 0;
657✔
3853
        pThreadInfo->totalInsertRows = 0;
657✔
3854

3855
        if (g_arguments->bind_vgroup) {
657✔
3856
            for (int32_t j = vgNext; j < database->vgroups; j++) {
14✔
3857
                SVGroup *vg = benchArrayGet(database->vgArray, j);
14✔
3858
                if (0 == vg->tbCountPerVgId) {
14✔
3859
                    continue;
×
3860
                }
3861
                pThreadInfo->vg               = vg;
14✔
3862
                pThreadInfo->ntables          = vg->tbCountPerVgId;
14✔
3863
                pThreadInfo->start_table_from = 0;
14✔
3864
                pThreadInfo->end_table_to     = vg->tbCountPerVgId - 1;
14✔
3865
                vgNext                        = j + 1;
14✔
3866
                break;
14✔
3867
            }            
3868
        } else {
3869
            pThreadInfo->start_table_from = tbNext;
643✔
3870
            pThreadInfo->ntables          = i < mod ? div + 1 : div;
643✔
3871
            pThreadInfo->end_table_to     = i < mod ? tbNext + div : tbNext + div - 1;
643✔
3872
            tbNext                        = pThreadInfo->end_table_to + 1;
643✔
3873
        }
3874

3875
        // init conn
3876
        pThreadInfo->delayList = benchArrayInit(1, sizeof(int64_t));
657✔
3877
        switch (stbInfo->iface) {
657✔
3878
            // rest
3879
            case REST_IFACE: {
13✔
3880
                if (stbInfo->interlaceRows > 0) {
13✔
3881
                    pThreadInfo->buffer = new_ds(0);
×
3882
                } else {
3883
                    pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
13✔
3884
                }
3885
                int sockfd = createSockFd();
13✔
3886
                if (sockfd < 0) {
13✔
3887
                    goto END;
×
3888
                }
3889
                pThreadInfo->sockfd = sockfd;
13✔
3890
                break;
13✔
3891
            }
3892
            // stmt & stmt2 init
3893
            case STMT_IFACE: 
75✔
3894
            case STMT2_IFACE: {
3895
                pThreadInfo->conn = initBenchConn();
75✔
3896
                if (NULL == pThreadInfo->conn) {
75✔
3897
                    goto END;
×
3898
                }
3899
                bool single = true;
75✔
3900
                if (database->superTbls->size > 1) {
75✔
3901
                    single = false;
18✔
3902
                }
3903

3904
                if (stbInfo->iface == STMT2_IFACE) {
75✔
3905
                    // stmt2 init
3906
                    if (pThreadInfo->conn->stmt2)
16✔
3907
                        taos_stmt2_close(pThreadInfo->conn->stmt2);
×
3908
                    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
16✔
3909
                    if (NULL == pThreadInfo->conn->stmt2) {
16✔
3910
                        errorPrint("taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
×
3911
                        goto END;                    
×
3912
                    }
3913
                } else {
3914
                    // stmt init
3915
                    if (pThreadInfo->conn->stmt)
59✔
3916
                        taos_stmt_close(pThreadInfo->conn->stmt);
×
3917
                    pThreadInfo->conn->stmt = initStmt(pThreadInfo->conn->taos, single);
59✔
3918
                    if (NULL == pThreadInfo->conn->stmt) {
59✔
3919
                        errorPrint("taos_stmt_init() failed, reason: %s\n", taos_errstr(NULL));
×
3920
                        goto END;                    
×
3921
                    }
3922
                }
3923

3924
                // select db
3925
                if (taos_select_db(pThreadInfo->conn->taos, database->dbName)) {
75✔
3926
                    errorPrint("taos select database(%s) failed\n", database->dbName);
×
3927
                    goto END;
×
3928
                }
3929

3930
                // malloc bind
3931
                int32_t unit = stbInfo->iface == STMT2_IFACE ? sizeof(TAOS_STMT2_BIND) : sizeof(TAOS_MULTI_BIND);
75✔
3932
                pThreadInfo->bind_ts       = benchCalloc(1, sizeof(int64_t), true);
75✔
3933
                pThreadInfo->bind_ts_array = benchCalloc(1, sizeof(int64_t)*g_arguments->prepared_rand, true);
75✔
3934
                pThreadInfo->bindParams    = benchCalloc(1, unit * (stbInfo->cols->size + 1), true);
75✔
3935
                pThreadInfo->is_null       = benchCalloc(1, g_arguments->reqPerReq, true);
75✔
3936
                // have ts columns, so size + 1
3937
                pThreadInfo->lengths       = benchCalloc(stbInfo->cols->size + 1, sizeof(int32_t*), true);
75✔
3938
                for(int32_t c = 0; c <= stbInfo->cols->size; c++) {
730✔
3939
                    pThreadInfo->lengths[c] = benchCalloc(g_arguments->reqPerReq, sizeof(int32_t), true);
655✔
3940
                }
3941
                // tags data
3942
                pThreadInfo->tagsStmt = copyBArray(stbInfo->tags);
75✔
3943
                for(int32_t n = 0; n < pThreadInfo->tagsStmt->size; n ++ ) {
452✔
3944
                    Field *field = benchArrayGet(pThreadInfo->tagsStmt, n);
377✔
3945
                    memset(&field->stmtData, 0, sizeof(StmtData));
377✔
3946
                }
3947
                
3948
                parseBufferToStmtBatch(stbInfo, pThreadInfo->bind_ts_array);
75✔
3949
                for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
1,440✔
3950
                    SChildTable *childTbl = stbInfo->childTblArray[child];
1,365✔
3951
                    if (childTbl->useOwnSample) {
1,365✔
3952
                        parseBufferToStmtBatchChildTbl(stbInfo, childTbl, pThreadInfo->bind_ts_array);
28✔
3953
                    }
3954
                }
3955

3956
                break;
75✔
3957
            }
3958
            // sml rest
3959
            case SML_REST_IFACE: {
37✔
3960
                int sockfd = createSockFd();
37✔
3961
                if (sockfd < 0) {
37✔
3962
                    goto END;
×
3963
                }
3964
                pThreadInfo->sockfd = sockfd;
37✔
3965
            }
3966
            // sml
3967
            case SML_IFACE: {
237✔
3968
                if (stbInfo->iface == SML_IFACE) {
237✔
3969
                    pThreadInfo->conn = initBenchConn();
200✔
3970
                    if (pThreadInfo->conn == NULL) {
200✔
3971
                        errorPrint("%s() init connection failed\n", __func__);
×
3972
                        goto END;
×
3973
                    }
3974
                    if (taos_select_db(pThreadInfo->conn->taos, database->dbName)) {
200✔
3975
                        errorPrint("taos select database(%s) failed\n", database->dbName);
×
3976
                        goto END;
×
3977
                    }
3978
                }
3979
                pThreadInfo->max_sql_len = stbInfo->lenOfCols + stbInfo->lenOfTags;
237✔
3980
                if (stbInfo->iface == SML_REST_IFACE) {
237✔
3981
                    pThreadInfo->buffer = benchCalloc(1, g_arguments->reqPerReq * (1 + pThreadInfo->max_sql_len), true);
37✔
3982
                }
3983
                int protocol = stbInfo->lineProtocol;
237✔
3984
                if (TSDB_SML_JSON_PROTOCOL != protocol && SML_JSON_TAOS_FORMAT != protocol) {
237✔
3985
                    pThreadInfo->sml_tags = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true);
111✔
3986
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
324✔
3987
                        pThreadInfo->sml_tags[t] = benchCalloc(1, stbInfo->lenOfTags, true);
213✔
3988
                    }
3989

3990
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
324✔
3991
                        if (generateRandData(
213✔
3992
                                    stbInfo, pThreadInfo->sml_tags[t],
213✔
3993
                                    stbInfo->lenOfTags,
213✔
3994
                                    stbInfo->lenOfCols + stbInfo->lenOfTags,
213✔
3995
                                    stbInfo->tags, 1, true, NULL)) {
3996
                            return -1;
×
3997
                        }
3998
                        debugPrint("pThreadInfo->sml_tags[%d]: %s\n", t,
213✔
3999
                                   pThreadInfo->sml_tags[t]);
4000
                    }
4001
                    pThreadInfo->lines = benchCalloc(g_arguments->reqPerReq, sizeof(char *), true);
111✔
4002
                    for (int j = 0; (j < g_arguments->reqPerReq && !g_arguments->terminate); j++) {
101,049✔
4003
                        pThreadInfo->lines[j] = benchCalloc(1, pThreadInfo->max_sql_len, true);
100,938✔
4004
                    }
4005
                } else {
4006
                    pThreadInfo->json_array          = tools_cJSON_CreateArray();
126✔
4007
                    pThreadInfo->sml_json_tags       = tools_cJSON_CreateArray();
126✔
4008
                    pThreadInfo->sml_tags_json_array = (char **)benchCalloc( pThreadInfo->ntables, sizeof(char *), true);
126✔
4009
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
372✔
4010
                        if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
246✔
4011
                            generateSmlJsonTags(
171✔
4012
                                pThreadInfo->sml_json_tags,
4013
                                    pThreadInfo->sml_tags_json_array,
4014
                                    stbInfo,
4015
                                pThreadInfo->start_table_from, t);
4016
                        } else {
4017
                            generateSmlTaosJsonTags(
75✔
4018
                                pThreadInfo->sml_json_tags, stbInfo,
4019
                                pThreadInfo->start_table_from, t);
4020
                        }
4021
                    }
4022
                    pThreadInfo->lines = (char **)benchCalloc(1, sizeof(char *), true);
126✔
4023
                    if (0 == stbInfo->interlaceRows && TSDB_SML_JSON_PROTOCOL == protocol) {
126✔
4024
                        pThreadInfo->line_buf_len = g_arguments->reqPerReq * accumulateRowLen(pThreadInfo->stbInfo->tags, pThreadInfo->stbInfo->iface);
75✔
4025
                        debugPrint("%s() LN%d, line_buf_len=%d\n", __func__, __LINE__, pThreadInfo->line_buf_len);
75✔
4026
                        pThreadInfo->lines[0]             = benchCalloc(1, pThreadInfo->line_buf_len, true);
75✔
4027
                        pThreadInfo->sml_json_value_array = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true);
75✔
4028
                        for (int t = 0; t < pThreadInfo->ntables; t++) {
222✔
4029
                            generateSmlJsonValues(pThreadInfo->sml_json_value_array, stbInfo, t);
147✔
4030
                        }
4031
                    }
4032
                }
4033
                break;
237✔
4034
            }
4035
            // taos
4036
            case TAOSC_IFACE: {
332✔
4037
                pThreadInfo->conn = initBenchConn();
332✔
4038
                if (pThreadInfo->conn == NULL) {
332✔
4039
                    errorPrint("%s() failed to connect\n", __func__);
×
4040
                    goto END;
×
4041
                }
4042
                char* command = benchCalloc(1, SHORT_1K_SQL_BUFF_LEN, false);
332✔
4043
                snprintf(command, SHORT_1K_SQL_BUFF_LEN,
332✔
4044
                        g_arguments->escape_character ? "USE `%s`" : "USE %s",
332✔
4045
                        database->dbName);
4046
                if (queryDbExecCall(pThreadInfo->conn, command)) {
332✔
4047
                    errorPrint("taos select database(%s) failed\n", database->dbName);
×
4048
                    goto END;
×
4049
                }
4050
                tmfree(command);
332✔
4051
                command = NULL;
332✔
4052

4053
                if (stbInfo->interlaceRows > 0) {
332✔
4054
                    pThreadInfo->buffer = new_ds(0);
18✔
4055
                } else {
4056
                    pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
314✔
4057
                    if (g_arguments->check_sql) {
314✔
4058
                        pThreadInfo->csql = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
4✔
4059
                        memset(pThreadInfo->csql, 0, TSDB_MAX_ALLOWED_SQL_LEN);
4✔
4060
                    }
4061
                }
4062

4063
                break;
332✔
4064
            }
4065
            default:
×
4066
                break;
×
4067
        }
4068
    }
4069

4070
    // success
4071
    ret = 0;
221✔
4072

4073
END:
221✔
4074
    if (csvFile) {
221✔
4075
        fclose(csvFile);
3✔
4076
    }
4077
    tmfree(tagData);
221✔
4078
    return ret;
221✔
4079
}
4080

4081
#ifdef LINUX
4082
#define EMPTY_SLOT -1
4083
// run with limit thread
4084
int32_t runInsertLimitThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, int32_t limitThread, threadInfo *infos, pthread_t *pids) {
×
4085
    infoPrint("run with bind vgroups limit thread. limit threads=%d nthread=%d\n", limitThread, nthreads);
×
4086
    
4087
    // slots save threadInfo array index
4088
    int32_t* slot = benchCalloc(limitThread, sizeof(int32_t), false); 
×
4089
    int32_t  t    = 0; // thread index
×
4090
    for (int32_t i = 0; i < limitThread; i++) {
×
4091
        slot[i] = EMPTY_SLOT;
×
4092
    }
4093

4094
    while (!g_arguments->terminate) {
×
4095
        int32_t emptySlot = 0;
×
4096
        for (int32_t i = 0; i < limitThread; i++) {
×
4097
            int32_t idx = slot[i];
×
4098
            // check slot thread end
4099
            if(idx != EMPTY_SLOT) {
×
4100
                if (pthread_tryjoin_np(pids[idx], NULL) == EBUSY ) {
×
4101
                    // thread is running
4102
                    toolsMsleep(2000);
×
4103
                } else {
4104
                    // thread is end , set slot empty 
4105
                    infoPrint("slot[%d] finished tidx=%d. completed thread count=%d\n", i, slot[i], t);
×
4106
                    slot[i] = EMPTY_SLOT;
×
4107
                }
4108
            } 
4109

4110
            if (slot[i] == EMPTY_SLOT && t < nthreads) {
×
4111
                // slot is empty , set new thread to running
4112
                threadInfo *pThreadInfo = infos + t;
×
4113
                if (stbInfo->interlaceRows > 0) {
×
4114
                    pthread_create(pids + t, NULL, syncWriteInterlace,   pThreadInfo);
×
4115
                } else {
4116
                    pthread_create(pids + t, NULL, syncWriteProgressive, pThreadInfo);
×
4117
                }
4118
                
4119
                // save current and move next
4120
                slot[i] = t;
×
4121
                t++;
×
4122
                infoPrint("slot[%d] start new thread tidx=%d. \n", i, slot[i]);
×
4123
            }
4124

4125
            // check slot empty
4126
            if(slot[i] == EMPTY_SLOT) {
×
4127
                emptySlot++;
×
4128
            }
4129
        }
4130

4131
        // check all thread end
4132
        if(emptySlot == limitThread) {
×
4133
            debugPrint("all threads(%d) is run finished.\n", nthreads);
×
4134
            break;
×
4135
        } else {
4136
            debugPrint("current thread index=%d all thread=%d\n", t, nthreads);
×
4137
        }
4138
    }
4139

4140
    return 0;
×
4141
}
4142
#endif
4143

4144
// run
4145
int32_t runInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids) {
221✔
4146
    infoPrint("run insert thread. real nthread=%d\n", nthreads);
221✔
4147
    // create threads
4148
    int threadCnt = 0;
221✔
4149
    for (int i = 0; i < nthreads && !g_arguments->terminate; i++) {
878✔
4150
        threadInfo *pThreadInfo = infos + i;
657✔
4151
        if (stbInfo->interlaceRows > 0) {
657✔
4152
            pthread_create(pids + i, NULL, syncWriteInterlace,   pThreadInfo);
68✔
4153
        } else {
4154
            pthread_create(pids + i, NULL, syncWriteProgressive, pThreadInfo);
589✔
4155
        }
4156
        threadCnt ++;
657✔
4157
    }    
4158

4159
    // wait threads
4160
    for (int i = 0; i < threadCnt; i++) {
878✔
4161
        infoPrint("pthread_join %d ...\n", i);
657✔
4162
        pthread_join(pids[i], NULL);
657✔
4163
    }
4164

4165
    return 0;
221✔
4166
}
4167

4168

4169
// exit and free resource
4170
int32_t exitInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids, int64_t spend) {
221✔
4171

4172
    if (g_arguments->terminate)  toolsMsleep(100);
221✔
4173

4174
    BArray *  total_delay_list = benchArrayInit(1, sizeof(int64_t));
221✔
4175
    int64_t   totalDelay = 0;
221✔
4176
    int64_t   totalDelay1 = 0;
221✔
4177
    int64_t   totalDelay2 = 0;
221✔
4178
    int64_t   totalDelay3 = 0;
221✔
4179
    uint64_t  totalInsertRows = 0;
221✔
4180

4181
    // free threads resource
4182
    for (int i = 0; i < nthreads; i++) {
878✔
4183
        threadInfo *pThreadInfo = infos + i;
657✔
4184
        // free check sql
4185
        if (pThreadInfo->csql) {
657✔
4186
            tmfree(pThreadInfo->csql);
4✔
4187
            pThreadInfo->csql = NULL;
4✔
4188
        }
4189

4190
        // close conn
4191
        int protocol = stbInfo->lineProtocol;
657✔
4192
        switch (stbInfo->iface) {
657✔
4193
            case REST_IFACE:
13✔
4194
                if (g_arguments->terminate)
13✔
4195
                    toolsMsleep(100);
×
4196
                destroySockFd(pThreadInfo->sockfd);
13✔
4197
                if (stbInfo->interlaceRows > 0) {
13✔
4198
                    free_ds(&pThreadInfo->buffer);
×
4199
                } else {
4200
                    tmfree(pThreadInfo->buffer);
13✔
4201
                    pThreadInfo->buffer = NULL;
13✔
4202
                }
4203
                break;
13✔
4204
            case SML_REST_IFACE:
37✔
4205
                if (g_arguments->terminate)
37✔
4206
                    toolsMsleep(100);
×
4207
                tmfree(pThreadInfo->buffer);
37✔
4208
                // on-purpose no break here
4209
            case SML_IFACE:
237✔
4210
                if (TSDB_SML_JSON_PROTOCOL != protocol
237✔
4211
                        && SML_JSON_TAOS_FORMAT != protocol) {
150✔
4212
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
324✔
4213
                        tmfree(pThreadInfo->sml_tags[t]);
213✔
4214
                    }
4215
                    for (int j = 0; j < g_arguments->reqPerReq; j++) {
101,049✔
4216
                        tmfree(pThreadInfo->lines[j]);
100,938✔
4217
                    }
4218
                    tmfree(pThreadInfo->sml_tags);
111✔
4219
                    pThreadInfo->sml_tags = NULL;
111✔
4220
                } else {
4221
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
372✔
4222
                        tmfree(pThreadInfo->sml_tags_json_array[t]);
246✔
4223
                    }
4224
                    tmfree(pThreadInfo->sml_tags_json_array);
126✔
4225
                    pThreadInfo->sml_tags_json_array = NULL;
126✔
4226
                    if (pThreadInfo->sml_json_tags) {
126✔
4227
                        tools_cJSON_Delete(pThreadInfo->sml_json_tags);
126✔
4228
                        pThreadInfo->sml_json_tags = NULL;
126✔
4229
                    }
4230
                    if (pThreadInfo->json_array) {
126✔
4231
                        tools_cJSON_Delete(pThreadInfo->json_array);
×
4232
                        pThreadInfo->json_array = NULL;
×
4233
                    }
4234
                }
4235
                if (pThreadInfo->lines) {
237✔
4236
                    if ((0 == stbInfo->interlaceRows)
237✔
4237
                            && (TSDB_SML_JSON_PROTOCOL == protocol)) {
201✔
4238
                        tmfree(pThreadInfo->lines[0]);
75✔
4239
                        for (int t = 0; t < pThreadInfo->ntables; t++) {
222✔
4240
                            tmfree(pThreadInfo->sml_json_value_array[t]);
147✔
4241
                        }
4242
                        tmfree(pThreadInfo->sml_json_value_array);
75✔
4243
                    }
4244
                    tmfree(pThreadInfo->lines);
237✔
4245
                    pThreadInfo->lines = NULL;
237✔
4246
                }
4247
                break;
237✔
4248

4249
            case STMT_IFACE:
59✔
4250
                // close stmt
4251
                if(pThreadInfo->conn->stmt) {
59✔
4252
                    taos_stmt_close(pThreadInfo->conn->stmt);
59✔
4253
                    pThreadInfo->conn->stmt = NULL;
59✔
4254
                }
4255
            case STMT2_IFACE:
4256
                // close stmt2
4257
                if (pThreadInfo->conn->stmt2) {
75✔
4258
                    taos_stmt2_close(pThreadInfo->conn->stmt2);
16✔
4259
                    pThreadInfo->conn->stmt2 = NULL;
16✔
4260
                }
4261

4262
                tmfree(pThreadInfo->bind_ts);
75✔
4263
                tmfree(pThreadInfo->bind_ts_array);
75✔
4264
                tmfree(pThreadInfo->bindParams);
75✔
4265
                tmfree(pThreadInfo->is_null);
75✔
4266
                
4267
                // free tagsStmt
4268
                BArray *tags = pThreadInfo->tagsStmt;
75✔
4269
                if(tags) {
75✔
4270
                    // free child
4271
                    for (int k = 0; k < tags->size; k++) {
452✔
4272
                        Field * tag = benchArrayGet(tags, k);
377✔
4273
                        tmfree(tag->stmtData.data);
377✔
4274
                        tag->stmtData.data = NULL;
377✔
4275
                        tmfree(tag->stmtData.is_null);
377✔
4276
                        tag->stmtData.is_null = NULL;
377✔
4277
                        tmfree(tag->stmtData.lengths);
377✔
4278
                        tag->stmtData.lengths = NULL;
377✔
4279
                    }
4280
                    // free parent
4281
                    benchArrayDestroy(tags);
75✔
4282
                    pThreadInfo->tagsStmt = NULL;
75✔
4283
                }
4284

4285
                // free lengths
4286
                if(pThreadInfo->lengths) {
75✔
4287
                    for(int c = 0; c <= stbInfo->cols->size; c++) {
730✔
4288
                        tmfree(pThreadInfo->lengths[c]);
655✔
4289
                    }
4290
                    free(pThreadInfo->lengths);
75✔
4291
                    pThreadInfo->lengths = NULL;
75✔
4292
                }
4293
                break;
75✔
4294

4295
            case TAOSC_IFACE:
332✔
4296
                if (stbInfo->interlaceRows > 0) {
332✔
4297
                    free_ds(&pThreadInfo->buffer);
18✔
4298
                } else {
4299
                    tmfree(pThreadInfo->buffer);
314✔
4300
                    pThreadInfo->buffer = NULL;
314✔
4301
                }
4302
                break;
332✔
4303

4304
            default:
×
4305
                break;
×
4306
        }
4307
        totalInsertRows += pThreadInfo->totalInsertRows;
657✔
4308
        totalDelay += pThreadInfo->totalDelay;
657✔
4309
        totalDelay1 += pThreadInfo->totalDelay1;
657✔
4310
        totalDelay2 += pThreadInfo->totalDelay2;
657✔
4311
        totalDelay3 += pThreadInfo->totalDelay3;
657✔
4312
        benchArrayAddBatch(total_delay_list, pThreadInfo->delayList->pData,
657✔
4313
                pThreadInfo->delayList->size, true);
657✔
4314
        tmfree(pThreadInfo->delayList);
657✔
4315
        pThreadInfo->delayList = NULL;
657✔
4316
        //  free conn
4317
        if (pThreadInfo->conn) {
657✔
4318
            closeBenchConn(pThreadInfo->conn);
607✔
4319
            pThreadInfo->conn = NULL;
607✔
4320
        }
4321
    }
4322

4323
    // calculate result
4324
    qsort(total_delay_list->pData, total_delay_list->size,
221✔
4325
            total_delay_list->elemSize, compare);
4326

4327
    if (g_arguments->terminate)  toolsMsleep(100);
221✔
4328

4329
    tmfree(pids);
221✔
4330
    tmfree(infos);
221✔
4331

4332
    // print result
4333
    int ret = printTotalDelay(database, totalDelay, totalDelay1, totalDelay2, totalDelay3,
221✔
4334
                              total_delay_list, nthreads, totalInsertRows, spend);
4335
    benchArrayDestroy(total_delay_list);
221✔
4336
    if (g_fail || ret != 0) {
221✔
4337
        return -1;
2✔
4338
    }
4339
    return 0;
219✔
4340
}
4341

4342
static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo) {
222✔
4343
    if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE)
222✔
4344
            && !stbInfo->use_metric) {
72✔
4345
        errorPrint("%s", "schemaless cannot work without stable\n");
1✔
4346
        return -1;
1✔
4347
    }
4348

4349
    // check argument valid
4350
    preProcessArgument(stbInfo);
221✔
4351

4352
    // ntable
4353
    int64_t ntables = obtainTableCount(database, stbInfo);
221✔
4354
    if (ntables == 0) {
221✔
4355
        errorPrint("insert table count is zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4356
        return -1;
×
4357
    }
4358

4359
    // assign table to thread
4360
    int32_t  nthreads  = g_arguments->nthreads;
221✔
4361
    int64_t  div       = 0;  // ntable / nthread  division
221✔
4362
    int64_t  mod       = 0;  // ntable % nthread
221✔
4363
    int64_t  spend     = 0;
221✔
4364

4365
    if (g_arguments->bind_vgroup) {
221✔
4366
        nthreads = assignTableToThread(database, stbInfo);
4✔
4367
        if(nthreads == 0) {
4✔
4368
            errorPrint("bind vgroup assign theads count is zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4369
            return -1;
×
4370
        }
4371
    } else {
4372
        if(nthreads == 0) {
217✔
4373
            errorPrint("argument thread_count can not be zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4374
            return -1;
×
4375
        }
4376
        div = ntables / nthreads;
217✔
4377
        if (div < 1) {
217✔
4378
            nthreads = (int32_t)ntables;
76✔
4379
            div = 1;
76✔
4380
        }
4381
        mod = ntables % nthreads;
217✔
4382
    }
4383

4384

4385
    // init each thread information
4386
    pthread_t   *pids  = benchCalloc(1, nthreads * sizeof(pthread_t),  true);
221✔
4387
    threadInfo  *infos = benchCalloc(1, nthreads * sizeof(threadInfo), true);
221✔
4388

4389
    // init
4390
    int32_t ret = initInsertThread(database, stbInfo, nthreads, infos, div, mod);
221✔
4391
    if( ret != 0) {
221✔
4392
        errorPrint("init insert thread failed. %s.%s\n", database->dbName, stbInfo->stbName);
×
4393
        tmfree(pids);
×
4394
        tmfree(infos);
×
4395
        return ret;
×
4396
    }
4397

4398
    infoPrint("Estimate memory usage: %.2fMB\n", (double)g_memoryUsage / 1048576);
221✔
4399
    prompt(0);
221✔
4400

4401
   
4402
    // run
4403
    int64_t start = toolsGetTimestampUs();
221✔
4404
    if(g_arguments->bind_vgroup && g_arguments->nthreads < nthreads ) {
221✔
4405
        // need many batch execute all threads
4406
#ifdef LINUX        
4407
        ret = runInsertLimitThread(database, stbInfo, nthreads, g_arguments->nthreads, infos, pids);
×
4408
#else
4409
        ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
4410
#endif        
4411
    } else {
4412
        // only one batch execute all threads
4413
        ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
221✔
4414
    }
4415

4416
    int64_t end = toolsGetTimestampUs();
221✔
4417
    if(end == start) {
221✔
4418
        spend = 1;
×
4419
    } else {
4420
        spend = end - start;
221✔
4421
    }
4422

4423
    // exit
4424
    ret = exitInsertThread(database, stbInfo, nthreads, infos, pids, spend);
221✔
4425
    return ret;
221✔
4426
}
4427

4428
static int getStbInsertedRows(char* dbName, char* stbName, TAOS* taos) {
×
4429
    int rows = 0;
×
4430
    char command[SHORT_1K_SQL_BUFF_LEN];
4431
    snprintf(command, SHORT_1K_SQL_BUFF_LEN, "SELECT COUNT(*) FROM %s.%s",
×
4432
             dbName, stbName);
4433
    TAOS_RES* res = taos_query(taos, command);
×
4434
    int code = taos_errno(res);
×
4435
    if (code != 0) {
×
4436
        printErrCmdCodeStr(command, code, res);
×
4437
        return -1;
×
4438
    }
4439
    TAOS_ROW row = taos_fetch_row(res);
×
4440
    if (row == NULL) {
×
4441
        rows = 0;
×
4442
    } else {
4443
        rows = (int)*(int64_t*)row[0];
×
4444
    }
4445
    taos_free_result(res);
×
4446
    return rows;
×
4447
}
4448

4449
static void create_tsma(TSMA* tsma, SBenchConn* conn, char* stbName) {
×
4450
    char command[SHORT_1K_SQL_BUFF_LEN];
4451
    int len = snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
4452
                       "CREATE sma INDEX %s ON %s function(%s) "
4453
                       "INTERVAL (%s) SLIDING (%s)",
4454
                       tsma->name, stbName, tsma->func,
4455
                       tsma->interval, tsma->sliding);
4456
    if (tsma->custom) {
×
4457
        snprintf(command + len, SHORT_1K_SQL_BUFF_LEN - len,
×
4458
                 " %s", tsma->custom);
4459
    }
4460
    int code = queryDbExecCall(conn, command);
×
4461
    if (code == 0) {
×
4462
        infoPrint("successfully create tsma with command <%s>\n", command);
×
4463
    }
4464
}
×
4465

4466
static void* create_tsmas(void* args) {
×
4467
    tsmaThreadInfo* pThreadInfo = (tsmaThreadInfo*) args;
×
4468
    int inserted_rows = 0;
×
4469
    SBenchConn* conn = initBenchConn();
×
4470
    if (NULL == conn) {
×
4471
        return NULL;
×
4472
    }
4473
    int finished = 0;
×
4474
    if (taos_select_db(conn->taos, pThreadInfo->dbName)) {
×
4475
        errorPrint("failed to use database (%s)\n", pThreadInfo->dbName);
×
4476
        closeBenchConn(conn);
×
4477
        return NULL;
×
4478
    }
4479
    while (finished < pThreadInfo->tsmas->size && inserted_rows >= 0) {
×
4480
        inserted_rows = (int)getStbInsertedRows(
×
4481
                pThreadInfo->dbName, pThreadInfo->stbName, conn->taos);
4482
        for (int i = 0; i < pThreadInfo->tsmas->size; i++) {
×
4483
            TSMA* tsma = benchArrayGet(pThreadInfo->tsmas, i);
×
4484
            if (!tsma->done &&  inserted_rows >= tsma->start_when_inserted) {
×
4485
                create_tsma(tsma, conn, pThreadInfo->stbName);
×
4486
                tsma->done = true;
×
4487
                finished++;
×
4488
                break;
×
4489
            }
4490
        }
4491
        toolsMsleep(10);
×
4492
    }
4493
    benchArrayDestroy(pThreadInfo->tsmas);
×
4494
    closeBenchConn(conn);
×
4495
    return NULL;
×
4496
}
4497

4498
static int32_t createStream(SSTREAM* stream) {
2✔
4499
    int32_t code = -1;
2✔
4500
    char * command = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
2✔
4501
    snprintf(command, TSDB_MAX_ALLOWED_SQL_LEN, "DROP STREAM IF EXISTS %s",
2✔
4502
             stream->stream_name);
2✔
4503
    infoPrint("%s\n", command);
2✔
4504
    SBenchConn* conn = initBenchConn();
2✔
4505
    if (NULL == conn) {
2✔
4506
        goto END_STREAM;
×
4507
    }
4508

4509
    code = queryDbExecCall(conn, command);
2✔
4510
    int32_t trying = g_arguments->keep_trying;
2✔
4511
    while (code && trying) {
2✔
4512
        infoPrint("will sleep %"PRIu32" milliseconds then re-drop stream %s\n",
×
4513
                          g_arguments->trying_interval, stream->stream_name);
4514
        toolsMsleep(g_arguments->trying_interval);
×
4515
        code = queryDbExecCall(conn, command);
×
4516
        if (trying != -1) {
×
4517
            trying--;
×
4518
        }
4519
    }
4520

4521
    if (code) {
2✔
4522
        closeBenchConn(conn);
×
4523
        goto END_STREAM;
×
4524
    }
4525

4526
    memset(command, 0, TSDB_MAX_ALLOWED_SQL_LEN);
2✔
4527
    int pos = snprintf(command, TSDB_MAX_ALLOWED_SQL_LEN,
2✔
4528
            "CREATE STREAM IF NOT EXISTS %s ", stream->stream_name);
2✔
4529
    if (stream->trigger_mode[0] != '\0') {
2✔
4530
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4531
                "TRIGGER %s ", stream->trigger_mode);
1✔
4532
    }
4533
    if (stream->watermark[0] != '\0') {
2✔
4534
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4535
                "WATERMARK %s ", stream->watermark);
1✔
4536
    }
4537
    if (stream->ignore_update[0] != '\0') {
2✔
4538
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4539
                "IGNORE UPDATE %s ", stream->ignore_update);
1✔
4540
    }
4541
    if (stream->ignore_expired[0] != '\0') {
2✔
4542
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4543
                "IGNORE EXPIRED %s ", stream->ignore_expired);
1✔
4544
    }
4545
    if (stream->fill_history[0] != '\0') {
2✔
4546
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4547
                "FILL_HISTORY %s ", stream->fill_history);
1✔
4548
    }
4549
    pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
2✔
4550
            "INTO %s ", stream->stream_stb);
2✔
4551
    if (stream->stream_stb_field[0] != '\0') {
2✔
4552
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4553
                "%s ", stream->stream_stb_field);
1✔
4554
    }
4555
    if (stream->stream_tag_field[0] != '\0') {
2✔
4556
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4557
                "TAGS%s ", stream->stream_tag_field);
1✔
4558
    }
4559
    if (stream->subtable[0] != '\0') {
2✔
4560
        pos += snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
1✔
4561
                "SUBTABLE%s ", stream->subtable);
1✔
4562
    }
4563
    snprintf(command + pos, TSDB_MAX_ALLOWED_SQL_LEN - pos,
2✔
4564
            "as %s", stream->source_sql);
2✔
4565
    infoPrint("%s\n", command);
2✔
4566

4567
    code = queryDbExecCall(conn, command);
2✔
4568
    trying = g_arguments->keep_trying;
2✔
4569
    while (code && trying) {
2✔
4570
        infoPrint("will sleep %"PRIu32" milliseconds "
×
4571
                  "then re-create stream %s\n",
4572
                  g_arguments->trying_interval, stream->stream_name);
4573
        toolsMsleep(g_arguments->trying_interval);
×
4574
        code = queryDbExecCall(conn, command);
×
4575
        if (trying != -1) {
×
4576
            trying--;
×
4577
        }
4578
    }
4579

4580
    closeBenchConn(conn);
2✔
4581
END_STREAM:
2✔
4582
    tmfree(command);
2✔
4583
    return code;
2✔
4584
}
4585

4586
void changeGlobalIface() {
166✔
4587
    if (g_arguments->databases->size == 1) {
166✔
4588
            SDataBase *db = benchArrayGet(g_arguments->databases, 0);
165✔
4589
            if (db && db->superTbls->size == 1) {
165✔
4590
                SSuperTable *stb = benchArrayGet(db->superTbls, 0);
137✔
4591
                if (stb) {
137✔
4592
                    if(g_arguments->iface != stb->iface) {
137✔
4593
                        infoPrint("only 1 db 1 super table, g_arguments->iface(%d) replace with stb->iface(%d) \n", g_arguments->iface, stb->iface);
15✔
4594
                        g_arguments->iface = stb->iface;
15✔
4595
                    }
4596
                }
4597
            }
4598
    }
4599
}
166✔
4600

4601
int insertTestProcess() {
166✔
4602
    prompt(0);
166✔
4603

4604
    encodeAuthBase64();
166✔
4605
    // if only one stable, global iface same with stable->iface
4606
    changeGlobalIface();
166✔
4607

4608
    //loop create database 
4609
    for (int i = 0; i < g_arguments->databases->size; i++) {
328✔
4610
        if (isRest(g_arguments->iface)) {
167✔
4611
            if (0 != convertServAddr(g_arguments->iface,
9✔
4612
                                     false,
4613
                                     1)) {
4614
                return -1;
×
4615
            }
4616
        }
4617
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
167✔
4618

4619
        if (database->drop && !(g_arguments->supplementInsert)) {
167✔
4620
            if (database->superTbls && database->superTbls->size > 0) {
156✔
4621
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, 0);
155✔
4622
                if (stbInfo && isRest(stbInfo->iface)) {
155✔
4623
                    if (0 != convertServAddr(stbInfo->iface,
13✔
4624
                                             stbInfo->tcpTransfer,
13✔
4625
                                             stbInfo->lineProtocol)) {
13✔
4626
                        return -1;
×
4627
                    }
4628
                }
4629
            }
4630
            if (createDatabase(database)) {
156✔
4631
                errorPrint("failed to create database (%s)\n",
5✔
4632
                        database->dbName);
4633
                return -1;
5✔
4634
            }
4635
            succPrint("created database (%s)\n", database->dbName);
151✔
4636
        } else if(g_arguments->bind_vgroup) {
11✔
4637
            // database already exist, get vgroups from server
4638
            SBenchConn* conn = initBenchConn();
2✔
4639
            if (conn) {
2✔
4640
                int32_t vgroups;
4641
#ifdef WEBSOCKET
4642
                if (g_arguments->websocket) {
2✔
4643
                    vgroups = getVgroupsWS(conn, database);
1✔
4644
                } else {
4645
#endif
4646
                    vgroups = getVgroupsNative(conn, database);
1✔
4647
#ifdef WEBSOCKET
4648
                }
4649
#endif
4650
                if (vgroups <=0) {
2✔
4651
                    closeBenchConn(conn);
×
4652
                    errorPrint("Database %s's vgroups is zero , db exist case.\n", database->dbName);
×
4653
                    return -1;
×
4654
                }
4655
                closeBenchConn(conn);
2✔
4656
                succPrint("Database (%s) get vgroups num is %d from server.\n", database->dbName, vgroups);
2✔
4657
            }
4658
        }
4659
    }
4660

4661
    // create super table && fill child tables && prepareSampleData
4662
    for (int i = 0; i < g_arguments->databases->size; i++) {
323✔
4663
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
162✔
4664
        if (database->superTbls) {
162✔
4665
            for (int j = 0; j < database->superTbls->size; j++) {
391✔
4666
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
229✔
4667
                if (stbInfo->iface != SML_IFACE
229✔
4668
                        && stbInfo->iface != SML_REST_IFACE
171✔
4669
                        && !stbInfo->childTblExists) {
157✔
4670
#ifdef WEBSOCKET
4671
                    if (g_arguments->websocket && !g_arguments->supplementInsert) {
148✔
4672
                        dropSuperTable(database, stbInfo);
3✔
4673
                    }
4674
#endif
4675
                    int code = getSuperTableFromServer(database, stbInfo);
148✔
4676
                    if (code == TSDB_CODE_FAILED) {
148✔
4677
                        return -1;
×
4678
                    }
4679
                    
4680
                    // with create table if not exists, so if exist, can not report failed
4681
                    if (createSuperTable(database, stbInfo)) {
148✔
4682
                        return -1;
×
4683
                    }
4684
                    
4685
                }
4686
                // fill last ts from super table
4687
                if(stbInfo->autoFillback && stbInfo->childTblExists) {
229✔
4688
                    fillSTableLastTs(database, stbInfo);
×
4689
                }
4690

4691
                // calc now 
4692
                if(stbInfo->calcNow) {
229✔
4693
                    calcExprFromServer(database, stbInfo);
×
4694
                }
4695

4696
                // check fill child table count valid
4697
                if(fillChildTblName(database, stbInfo) <= 0) {
229✔
4698
                    infoPrint(" warning fill childs table count is zero, db:%s stb: %s \n", database->dbName, stbInfo->stbName);
3✔
4699
                }
4700
                if (0 != prepareSampleData(database, stbInfo)) {
229✔
4701
                    return -1;
×
4702
                }
4703

4704
                // early malloc buffer for auto create table
4705
                if((stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating) {
229✔
4706
                    prepareTagsStmt(stbInfo);
3✔
4707
                }
4708

4709
                // execute sqls
4710
                if (stbInfo->sqls) {
229✔
4711
                    char **sqls = stbInfo->sqls;
×
4712
                    while (*sqls) {
×
4713
                        queryDbExec(database, stbInfo, *sqls);
×
4714
                        sqls++;
×
4715
                    } 
4716
                }
4717
            }
4718
        }
4719
    }
4720

4721
    // tsma
4722
    if (g_arguments->taosc_version == 3) {
161✔
4723
        for (int i = 0; i < g_arguments->databases->size; i++) {
323✔
4724
            SDataBase* database = benchArrayGet(g_arguments->databases, i);
162✔
4725
            if (database->superTbls) {
162✔
4726
                for (int j = 0; (j < database->superTbls->size
162✔
4727
                        && !g_arguments->terminate); j++) {
391✔
4728
                    SSuperTable* stbInfo =
4729
                        benchArrayGet(database->superTbls, j);
229✔
4730
                    if (stbInfo->tsmas == NULL) {
229✔
4731
                        continue;
80✔
4732
                    }
4733
                    if (stbInfo->tsmas->size > 0) {
149✔
4734
                        tsmaThreadInfo* pThreadInfo =
4735
                            benchCalloc(1, sizeof(tsmaThreadInfo), true);
×
4736
                        pthread_t tsmas_pid = {0};
×
4737
                        pThreadInfo->dbName = database->dbName;
×
4738
                        pThreadInfo->stbName = stbInfo->stbName;
×
4739
                        pThreadInfo->tsmas = stbInfo->tsmas;
×
4740
                        pthread_create(&tsmas_pid, NULL,
×
4741
                                       create_tsmas, pThreadInfo);
4742
                    }
4743
                }
4744
            }
4745
        }
4746
    }
4747

4748
    if (createChildTables()) return -1;
161✔
4749

4750
    if (g_arguments->taosc_version == 3) {
161✔
4751
        for (int j = 0; j < g_arguments->streams->size; j++) {
163✔
4752
            SSTREAM * stream = benchArrayGet(g_arguments->streams, j);
2✔
4753
            if (stream->drop) {
2✔
4754
                if (createStream(stream)) {
2✔
4755
                    return -1;
×
4756
                }
4757
            }
4758
        }
4759
    }
4760

4761
    // create sub threads for inserting data
4762
    for (int i = 0; i < g_arguments->databases->size; i++) {
320✔
4763
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
162✔
4764
        if (database->superTbls) {
162✔
4765
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
386✔
4766
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
227✔
4767
                if (stbInfo->insertRows == 0) {
227✔
4768
                    continue;
5✔
4769
                }
4770
                prompt(stbInfo->non_stop);
222✔
4771
                if (startMultiThreadInsertData(database, stbInfo)) {
222✔
4772
                    return -1;
3✔
4773
                }
4774
            }
4775
        }
4776
    }
4777
    return 0;
158✔
4778
}
4779

4780
//
4781
//     ------- STMT 2 -----------
4782
//
4783

4784
static int32_t stmt2BindAndSubmit(
133✔
4785
        threadInfo *pThreadInfo,
4786
        SChildTable *childTbl,
4787
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
4788
        int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w) {
4789
    
4790
    // create bindV
4791
    int32_t count            = 1;
133✔
4792
    TAOS_STMT2_BINDV * bindv = createBindV(count, 0, 0);
133✔
4793
    TAOS_STMT2 *stmt2        = pThreadInfo->conn->stmt2;
133✔
4794
    SSuperTable *stbInfo     = pThreadInfo->stbInfo;
133✔
4795

4796
    //
4797
    // bind
4798
    //
4799

4800
    // count
4801
    bindv->count = 1;
133✔
4802
    // tbnames
4803
    bindv->tbnames[0] = childTbl->name;
133✔
4804
    // tags
4805
    //bindv->tags[0] = NULL; // Progrssive mode tag put on prepare sql, no need put here
4806
   
4807
    // bind_cols
4808
    uint32_t batch = (g_arguments->reqPerReq > stbInfo->insertRows - i) ? (stbInfo->insertRows - i) : g_arguments->reqPerReq;
133✔
4809
    int32_t n = 0;
133✔
4810
    int64_t pos = i % g_arguments->prepared_rand;
133✔
4811

4812
    // adjust batch about pos
4813
    if(g_arguments->prepared_rand - pos < batch ) {
133✔
4814
        debugPrint("prepared_rand(%" PRId64 ") is not a multiple of num_of_records_per(%d), the batch size can be modify. before=%d after=%d\n", 
5✔
4815
                    (int64_t)g_arguments->prepared_rand, (int32_t)g_arguments->reqPerReq, (int32_t)batch, (int32_t)(g_arguments->prepared_rand - pos));
4816
        batch = g_arguments->prepared_rand - pos;
5✔
4817
    } 
4818

4819
    if (batch == 0) {
133✔
4820
        infoPrint("batch size is zero. pos = %"PRId64"\n", pos);
×
4821
        return 0;
×
4822
    }
4823

4824
    uint32_t generated = bindVColsProgressive(bindv, 0, pThreadInfo, batch, *timestamp, pos, childTbl, pkCur, pkCnt, &n);
133✔
4825
    if(generated == 0) {
133✔
4826
        errorPrint( "get cols data bind information failed. table: %s\n", childTbl->name);
×
4827
        freeBindV(bindv);
×
4828
        return -1;
×
4829
    }
4830
    *timestamp += n * stbInfo->timestamp_step;
133✔
4831

4832
    if (g_arguments->debug_print) {
133✔
4833
        showBindV(bindv, stbInfo->tags, stbInfo->cols);
×
4834
    }
4835

4836
    // bind and submit
4837
    int32_t code = submitStmt2(pThreadInfo, bindv, delay1, delay3, startTs, endTs, &generated, w);
133✔
4838
    // free
4839
    freeBindV(bindv);
133✔
4840

4841
    if(code != 0) {
132✔
4842
        errorPrint( "failed submitStmt2() progressive mode, table: %s . engine error: %s\n", childTbl->name, taos_stmt2_error(stmt2));
×
4843
        return code;
×
4844
    } else {
4845
        debugPrint("succ submitStmt2 progressive mode. table=%s batch=%d pos=%" PRId64 " ts=%" PRId64 " generated=%d\n",
132✔
4846
                childTbl->name, batch, pos, *timestamp, generated);
4847
        return generated;
133✔
4848
    }
4849
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc