• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5051

13 May 2026 12:00PM UTC coverage: 73.358% (-0.04%) from 73.398%
#5051

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

714 existing lines in 146 files now uncovered.

281543 of 383795 relevant lines covered (73.36%)

135448694.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.21
/tools/taosBenchmark/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
// query and get result  record is true to total request
17
int selectAndGetResult(qThreadInfo *pThreadInfo, char *command, bool record) {
248,878✔
18
    int ret = 0;
248,878✔
19

20
    // user cancel
21
    if (g_arguments->terminate) {
248,878✔
22
        return -1;
×
23
    }
24

25
    // execute sql
26
    uint32_t threadID = pThreadInfo->threadID;
248,845✔
27
    char dbName[TSDB_DB_NAME_LEN] = {0};
248,550✔
28
    TOOLS_STRNCPY(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
248,690✔
29

30
    if (g_queryInfo.iface == REST_IFACE) {
248,511✔
31
        int retCode = postProcessSql(command, g_queryInfo.dbName, 0, REST_IFACE,
3,072✔
32
                                   0, g_arguments->port, false,
1,536✔
33
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
1,536✔
34
        if (0 != retCode) {
1,536✔
35
            errorPrint("====restful return fail, threadID[%u]\n",
×
36
                       threadID);
37
            ret = -1;
×
38
        }
39
    } else {
40
        // query
41
        TAOS *taos = pThreadInfo->conn->taos;
247,248✔
42
        int64_t rows  = 0;
247,427✔
43
        TAOS_RES *res = taos_query(taos, command);
247,427✔
44
        int code = taos_errno(res);
247,427✔
45
        if (res == NULL || code) {
247,371✔
46
            // failed query
47
            errorPrint("failed to execute sql:%s, "
247✔
48
                        "code: 0x%08x, reason:%s\n",
49
                        command, code, taos_errstr(res));
50
            ret = -1;
237✔
51
        } else {
52
            // succ query
53
            if (record)
247,124✔
54
                rows = fetchResult(res, pThreadInfo->filePath);
246,950✔
55
        }
56

57
        // free result
58
        if (res) {
247,427✔
59
            taos_free_result(res);
247,427✔
60
        }
61
        debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
247,115✔
62
    }
63

64
    // record count
65
    if (ret ==0) {
248,731✔
66
        // succ
67
        if (record)
248,494✔
68
            pThreadInfo->nSucc ++;
248,315✔
69
    } else {
70
        // fail
71
        if (record)
237✔
72
            pThreadInfo->nFail ++;
237✔
73

74
        // continue option
75
        if (YES_IF_FAILED == g_arguments->continueIfFail) {
237✔
76
            ret = 0; // force continue
×
77
        }
78
    }
79

80
    return ret;
248,750✔
81
}
82

83
// interlligent sleep
84
int32_t autoSleep(uint64_t interval, uint64_t delay ) {
9,174✔
85
    int32_t msleep = 0;
9,174✔
86
    if (delay < interval * 1000) {
9,174✔
87
        msleep = (int32_t)((interval - delay/1000));
1,290✔
88
        infoPrint("do sleep %dms ...\n", msleep);
1,290✔
89
        toolsMsleep(msleep);  // ms
1,290✔
90
        debugPrint("%s\n","do sleep end");
1,290✔
91
    }
92
    return msleep;
9,145✔
93
}
94

95
// reset
96
int32_t resetQueryCache(qThreadInfo* pThreadInfo) {
240✔
97
    // execute sql
98
    if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE", false)) {
240✔
99
        errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
100
        return -1;
×
101
    }
102
    // succ
103
    return 0;
240✔
104
}
105

106

107

108
//
109
//  ---------------------------------  second levle funtion for Thread -----------------------------------
110
//
111

112
// show rela qps
113
int64_t showRealQPS(qThreadInfo* pThreadInfo, int64_t lastPrintTime, int64_t startTs) {
247,969✔
114
    int64_t now = toolsGetTimestampMs();
247,969✔
115
    if (now - lastPrintTime > 10 * 1000) {
247,568✔
116
        // real total
117
        uint64_t totalQueried = pThreadInfo->nSucc;
4,212✔
118
        if(g_arguments->continueIfFail == YES_IF_FAILED) {
4,885✔
119
            totalQueried += pThreadInfo->nFail;
1,140✔
120
        }
121
        infoPrint(
4,885✔
122
            "thread[%d] has currently completed queries: %" PRIu64 ", QPS: %10.3f\n",
123
            pThreadInfo->threadID, totalQueried,
124
            (double)(totalQueried / ((now - startTs) / 1000.0)));
125
        return now;
4,885✔
126
    } else {
127
        return lastPrintTime;
243,372✔
128
    }
129
}
130

131
// spec query mixed thread
132
static void *specQueryMixThread(void *sarg) {
1,736✔
133
    qThreadInfo *pThreadInfo = (qThreadInfo*)sarg;
1,736✔
134
#ifdef LINUX
135
    prctl(PR_SET_NAME, "specQueryMixThread");
1,736✔
136
#endif
137
    // use db
138
    if (g_queryInfo.dbName) {
1,736✔
139
        if (pThreadInfo->conn &&
1,736✔
140
            pThreadInfo->conn->taos &&
3,472✔
141
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
1,736✔
142
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
143
                return NULL;
×
144
        }
145
    }
146

147
    int64_t st = 0;
1,736✔
148
    int64_t et = 0;
1,736✔
149
    int64_t startTs        = toolsGetTimestampMs();
1,736✔
150
    int64_t lastPrintTime  = startTs;
1,651✔
151
    // batchQuery
152
    bool     batchQuery    = g_queryInfo.specifiedQueryInfo.batchQuery;
1,651✔
153
    uint64_t queryTimes    = batchQuery ? 1 : g_queryInfo.specifiedQueryInfo.queryTimes;
1,651✔
154
    uint64_t interval      = batchQuery ? 0 : g_queryInfo.specifiedQueryInfo.queryInterval;
1,736✔
155

156
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
1,736✔
157
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
3,472✔
158
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
1,711✔
159
        for (uint64_t j = 0; j < queryTimes; ++j) {
16,086✔
160
            // use cancel
161
            if(g_arguments->terminate) {
14,350✔
162
                infoPrint("%s\n", "user cancel , so exit testing.");
×
163
                break;
×
164
            }
165

166
            // reset cache
167
            if (g_queryInfo.reset_query_cache) {
14,290✔
168
                if (resetQueryCache(pThreadInfo)) {
×
169
                    errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
170
                    return NULL;
×
171
                }
172
            }
173

174
            // execute sql
175
            st = toolsGetTimestampUs();
14,290✔
176
            int ret = selectAndGetResult(pThreadInfo, sql->command, true);
14,350✔
177
            if (ret) {
14,350✔
178
                g_fail = true;
×
179
                errorPrint("failed call mix selectAndGetResult, i=%d j=%" PRIu64 "", i, j);
×
180
                return NULL;
×
181
            }
182
            et = toolsGetTimestampUs();
14,350✔
183

184
            // sleep
185
            if (interval > 0) {
14,350✔
186
                autoSleep(interval, et - st);
100✔
187
            }
188

189
            // delay
190
            if (ret == 0) {
14,350✔
191
                int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
14,350✔
192
                *delay = et - st;
14,350✔
193
                debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, *delay);
14,350✔
194

195
                pThreadInfo->total_delay += *delay;
14,350✔
196
                if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
14,350✔
197
                    tmfree(delay);
×
198
                }
199
            }
200

201
            // real show
202
            lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
14,350✔
203
        }
204
    }
205

206
    return NULL;
1,736✔
207
}
208

209
// spec query thread
210
static void *specQueryThread(void *sarg) {
4,889✔
211
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
4,889✔
212
#ifdef LINUX
213
    prctl(PR_SET_NAME, "specQueryThread");
4,889✔
214
#endif
215
    uint64_t st = 0;
4,889✔
216
    uint64_t et = 0;
4,889✔
217
    int32_t  index = 0;
4,889✔
218

219
    // use db
220
    if (g_queryInfo.dbName) {
4,889✔
221
        if (pThreadInfo->conn &&
4,889✔
222
            pThreadInfo->conn->taos &&
8,686✔
223
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
4,343✔
224
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
225
                return NULL;
×
226
        }
227
    }
228

229
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
4,889✔
230
    uint64_t  interval   = g_queryInfo.specifiedQueryInfo.queryInterval;
4,889✔
231
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
4,889✔
232

233
    uint64_t  startTs       = toolsGetTimestampMs();
4,889✔
234
    uint64_t  lastPrintTime = startTs;
4,889✔
235

236
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, pThreadInfo->querySeq);
4,889✔
237

238
    if (sql->result[0] != '\0') {
4,889✔
239
        (void)snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
8,704✔
240
                sql->result, pThreadInfo->threadID);
4,352✔
241
    }
242

243
    while (index < (int64_t)queryTimes) {
52,657✔
244
        // use cancel
245
        if(g_arguments->terminate) {
48,011✔
246
            infoPrint("thread[%d] user cancel , so exit testing.\n", pThreadInfo->threadID);
×
247
            break;
×
248
        }
249

250
        // reset cache
251
        if (g_queryInfo.reset_query_cache) {
48,306✔
252
            if (resetQueryCache(pThreadInfo)) {
180✔
253
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
254
                return NULL;
×
255
            }
256
        }
257

258
        // execute sql
259
        st = toolsGetTimestampUs();
48,306✔
260
        int ret = selectAndGetResult(pThreadInfo, sql->command, true);
48,329✔
261
        if (ret) {
48,323✔
262
            g_fail = true;
237✔
263
            errorPrint("failed call spec selectAndGetResult, index=%d\n", index);
237✔
264
            break;
237✔
265
        }
266
        et = toolsGetTimestampUs();
48,086✔
267

268
        // sleep
269
        if (interval > 0) {
48,027✔
270
            autoSleep(interval, et - st);
6,866✔
271
        }
272

273

274
        uint64_t delay = et - st;
47,998✔
275
        debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
47,998✔
276

277
        if (ret == 0) {
48,063✔
278
            // only succ add delay list
279
            benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
48,063✔
280
            pThreadInfo->total_delay += delay;
48,004✔
281
        }
282
        index++;
48,004✔
283

284
        // real show
285
        lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
48,004✔
286
    }
287

288
    return NULL;
4,883✔
289
}
290

291
// super table query thread
292
static void *stbQueryThread(void *sarg) {
1,228✔
293
    char *sqlstr = benchCalloc(1, TOOLS_MAX_ALLOWED_SQL_LEN, false);
1,228✔
294
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
1,228✔
295
#ifdef LINUX
296
    prctl(PR_SET_NAME, "stbQueryThread");
1,228✔
297
#endif
298

299
    uint64_t st = 0;
1,228✔
300
    uint64_t et = 0;
1,228✔
301

302
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
1,228✔
303
    uint64_t interval   = g_queryInfo.superQueryInfo.queryInterval;
1,228✔
304
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(uint64_t));
1,228✔
305

306
    uint64_t startTs = toolsGetTimestampMs();
1,228✔
307
    uint64_t lastPrintTime = startTs;
1,228✔
308
    while (queryTimes--) {
16,150✔
309
        // use cancel
310
        if(g_arguments->terminate) {
14,998✔
311
            infoPrint("%s\n", "user cancel , so exit testing.");
×
312
            break;
×
313
        }
314

315
        // reset cache
316
        if (g_queryInfo.reset_query_cache) {
14,998✔
317
            if (resetQueryCache(pThreadInfo)) {
60✔
318
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
319
                return NULL;
×
320
            }
321
        }
322

323
        // execute
324
        st = toolsGetTimestampMs();
14,998✔
325
        // for each table
326
        for (int i = (int)pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
61,948✔
327
            // use cancel
328
            if(g_arguments->terminate) {
47,010✔
UNCOV
329
                infoPrint("%s\n", "user cancel , so exit testing.");
×
330
                break;
×
331
            }
332

333
            // for each sql
334
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
232,724✔
335
                memset(sqlstr, 0, TOOLS_MAX_ALLOWED_SQL_LEN);
185,536✔
336
                // use cancel
337
                if(g_arguments->terminate) {
185,536✔
338
                    infoPrint("%s\n", "user cancel , so exit testing.");
×
339
                    break;
×
340
                }
341

342
                // get real child name sql
343
                if (replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i)) {
186,044✔
344
                    // fault
345
                    tmfree(sqlstr);
×
346
                    return NULL;
×
347
                }
348

349
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
186,044✔
350
                    (void)snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
296,034✔
351
                            g_queryInfo.superQueryInfo.result[j],
147,990✔
352
                            pThreadInfo->threadID);
353
                }
354

355
                // execute sql
356
                uint64_t s = toolsGetTimestampUs();
186,098✔
357
                int ret = selectAndGetResult(pThreadInfo, sqlstr, true);
186,006✔
358
                if (ret) {
185,834✔
359
                    // found error
360
                    errorPrint("failed call stb selectAndGetResult, i=%d j=%d\n", i, j);
×
361
                    g_fail = true;
×
362
                    tmfree(sqlstr);
×
363
                    return NULL;
×
364
                }
365
                uint64_t delay = toolsGetTimestampUs() - s;
185,834✔
366
                debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
185,632✔
367
                if (ret == 0) {
185,840✔
368
                    // only succ add delay list
369
                    benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
185,840✔
370
                    pThreadInfo->total_delay += delay;
185,450✔
371
                }
372

373
                // show real QPS
374
                lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
185,506✔
375
            }
376
        }
377
        et = toolsGetTimestampMs();
14,938✔
378

379
        // sleep
380
        if (interval > 0) {
14,922✔
381
            autoSleep(interval, et - st);
1,918✔
382
        }
383

384
    }
385
    tmfree(sqlstr);
1,152✔
386

387
    return NULL;
1,228✔
388
}
389

390
//
391
// ---------------------------------  firse level function ------------------------------
392
//
393

394
void totalChildQuery(qThreadInfo* infos, int threadCnt, int64_t spend, BArray *pDelays) {
872✔
395
    // valid check
396
    if (infos == NULL || threadCnt == 0) {
872✔
397
        return ;
×
398
    }
399

400
    // statistic
401
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
872✔
402
    double total_delays = 0;
872✔
403

404
    // clear
405
    for (int i = 0; i < threadCnt; ++i) {
3,836✔
406
        qThreadInfo * pThreadInfo = infos + i;
2,964✔
407
        if(pThreadInfo->query_delay_list == NULL) {
2,964✔
408
            continue;;
×
409
        }
410

411
        // append delay
412
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
2,964✔
413
                pThreadInfo->query_delay_list->size, false);
2,964✔
414
        total_delays += pThreadInfo->total_delay;
2,964✔
415

416
        // free delay
417
        benchArrayDestroy(pThreadInfo->query_delay_list);
2,964✔
418
        pThreadInfo->query_delay_list = NULL;
2,964✔
419

420
    }
421

422
    // succ is zero
423
    if (delay_list->size == 0) {
872✔
424
        errorPrint("%s", "succ queries count is zero.\n");
×
425
        benchArrayDestroy(delay_list);
×
426
        return ;
×
427
    }
428

429

430
    // sort
431
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
872✔
432

433
    size_t totalQueried = delay_list->size;
872✔
434
    double time_cost = spend / 1E6;
872✔
435
    double qps = (time_cost > 0) ? (totalQueried / time_cost) : 0.0;
872✔
436
    double avgDelay = (double)total_delays/delay_list->size/1E6;
872✔
437
    double minDelay = *(int64_t *)(benchArrayGet(delay_list, 0))/1E6;
872✔
438
    double maxDelay = *(int64_t *)(benchArrayGet(delay_list, (int32_t)(delay_list->size - 1)))/1E6;
872✔
439
    double p90      = *(int64_t *)(benchArrayGet(delay_list, (int32_t)(delay_list->size * 0.90)))/1E6;
872✔
440
    double p95      = *(int64_t *)(benchArrayGet(delay_list, (int32_t)(delay_list->size * 0.95)))/1E6;
872✔
441
    double p99      = *(int64_t *)(benchArrayGet(delay_list, (int32_t)(delay_list->size * 0.99)))/1E6;
872✔
442

443
    // show delay min max
444
    if (delay_list->size) {
872✔
445
        infoPrint(
872✔
446
                "spend %.6fs using "
447
                "%d threads complete query %zu times,  "
448
                "min delay: %.6fs, "
449
                "avg delay: %.6fs, "
450
                "p90: %.6fs, "
451
                "p95: %.6fs, "
452
                "p99: %.6fs, "
453
                "max: %.6fs\n",
454
                time_cost,
455
                threadCnt, totalQueried,
456
                minDelay,
457
                avgDelay,
458
                p90,
459
                p95,
460
                p99,
461
                maxDelay);
462
    }
463

464
    // output json for super table query
465
    if (g_queryInfo.superQueryInfo.sqlCount > 0) {
872✔
466
        tools_cJSON *root = NULL;
439✔
467
        tools_cJSON *result_array = NULL;
439✔
468
        if (g_arguments->output_json_file) {
439✔
469
            root = tools_cJSON_CreateObject();
×
470
            if (root != NULL) {
×
471
                result_array = tools_cJSON_CreateArray();
×
472
                if (result_array != NULL) {
×
473
                    tools_cJSON_AddItemToObject(root, "results", result_array);
×
474
                } else {
475
                    errorPrint("Failed to create result_array JSON object\n");
×
476
                    tools_cJSON_Delete(root);
×
477
                    root = NULL;
×
478
                }
479
            }
480
        }
481

482
        if (result_array) {
439✔
483
            tools_cJSON *sqlResult = tools_cJSON_CreateObject();
×
484
            if (sqlResult) {
×
485
                tools_cJSON_AddNumberToObject(sqlResult, "threads", threadCnt);
×
486
                tools_cJSON_AddNumberToObject(sqlResult, "total_queries", totalQueried);
×
487
                tools_cJSON_AddNumberToObject(sqlResult, "time_cost", time_cost);
×
488
                tools_cJSON_AddNumberToObject(sqlResult, "qps", qps);
×
489
                tools_cJSON_AddNumberToObject(sqlResult, "avg", avgDelay);
×
490
                tools_cJSON_AddNumberToObject(sqlResult, "min", minDelay);
×
491
                tools_cJSON_AddNumberToObject(sqlResult, "max", maxDelay);
×
492
                tools_cJSON_AddNumberToObject(sqlResult, "p90", p90);
×
493
                tools_cJSON_AddNumberToObject(sqlResult, "p95", p95);
×
494
                tools_cJSON_AddNumberToObject(sqlResult, "p99", p99);
×
495
                tools_cJSON_AddItemToArray(result_array, sqlResult);
×
496
            } else {
497
                errorPrint("Failed to create JSON object for SQL result.\n");
×
498
            }
499
        }
500

501
        if (root) {
439✔
502
            char *jsonStr = tools_cJSON_PrintUnformatted(root);
×
503
            if (jsonStr) {
×
504
                FILE *fp = fopen(g_arguments->output_json_file, "w");
×
505
                if (fp) {
×
506
                    fprintf(fp, "%s\n", jsonStr);
×
507
                    fclose(fp);
×
508
                } else {
509
                    errorPrint("Failed to open output JSON file, file name %s\n",
×
510
                        g_arguments->output_json_file);
511
                }
512

513
                free(jsonStr);
×
514
                jsonStr = NULL;
×
515
            }
516
            tools_cJSON_Delete(root);
×
517
            root = NULL;
×
518
        }
519
    }
520

521
    // copy to another
522
    if (pDelays) {
872✔
523
        benchArrayAddBatch(pDelays, delay_list->pData, delay_list->size, false);
290✔
524
    }
525
    benchArrayDestroy(delay_list);
872✔
526
}
527

528
//
529
// super table query
530
//
531
static int stbQuery(uint16_t iface, char* dbName) {
439✔
532
    int ret = -1;
439✔
533
    pthread_t * pidsOfSub   = NULL;
439✔
534
    qThreadInfo *threadInfos = NULL;
439✔
535
    g_queryInfo.superQueryInfo.totalQueried = 0;
439✔
536
    g_queryInfo.superQueryInfo.totalFail    = 0;
439✔
537

538
    // check
539
    if ((g_queryInfo.superQueryInfo.sqlCount == 0)
439✔
540
        || (g_queryInfo.superQueryInfo.threadCnt == 0)) {
439✔
541
        return 0;
×
542
    }
543

544
    // malloc
545
    pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
439✔
546
                            *sizeof(pthread_t),
547
                            false);
548
    threadInfos = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
439✔
549
                                *sizeof(qThreadInfo), false);
550

551
    int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
439✔
552
    int nConcurrent = g_queryInfo.superQueryInfo.threadCnt;
439✔
553

554
    int64_t a = ntables / nConcurrent;
439✔
555
    if (a < 1) {
439✔
556
        nConcurrent = (int)ntables;
81✔
557
        a = 1;
81✔
558
    }
559

560
    int64_t b = 0;
439✔
561
    if (nConcurrent != 0) {
439✔
562
        b = ntables % nConcurrent;
439✔
563
    }
564

565
    uint64_t tableFrom = 0;
439✔
566
    int threadCnt = 0;
439✔
567
    for (int i = 0; i < nConcurrent; i++) {
1,667✔
568
        qThreadInfo *pThreadInfo = threadInfos + i;
1,228✔
569
        pThreadInfo->dbName = dbName;
1,228✔
570
        pThreadInfo->threadID = i;
1,228✔
571
        pThreadInfo->start_table_from = tableFrom;
1,228✔
572
        pThreadInfo->ntables = i < b ? a + 1 : a;
1,228✔
573
        pThreadInfo->end_table_to =
1,228✔
574
                i < b ? tableFrom + a : tableFrom + a - 1;
1,228✔
575
        tableFrom = pThreadInfo->end_table_to + 1;
1,228✔
576
        // create conn
577
        if (initQueryConn(pThreadInfo, iface)){
1,228✔
578
            break;
×
579
        }
580
        int code = pthread_create(pidsOfSub + i, NULL, stbQueryThread, pThreadInfo);
1,228✔
581
        if (code != 0) {
1,228✔
582
            errorPrint("failed stbQueryThread create. error code =%d \n", code);
×
583
            break;
×
584
        }
585
        threadCnt ++;
1,228✔
586
    }
587

588
    bool needExit = false;
439✔
589
    // if failed, set termainte flag true like ctrl+c exit
590
    if (threadCnt != nConcurrent  ) {
439✔
591
        needExit = true;
×
592
        g_arguments->terminate = true;
×
593
        goto OVER;
×
594
    }
595

596
    // reset total
597
    g_queryInfo.superQueryInfo.totalQueried = 0;
439✔
598
    g_queryInfo.superQueryInfo.totalFail    = 0;
439✔
599

600
    // real thread count
601
    g_queryInfo.superQueryInfo.threadCnt = threadCnt;
439✔
602
    int64_t start = toolsGetTimestampUs();
439✔
603

604
    for (int i = 0; i < threadCnt; i++) {
1,667✔
605
        pthread_join(pidsOfSub[i], NULL);
1,228✔
606
        qThreadInfo *pThreadInfo = threadInfos + i;
1,228✔
607
        // add succ
608
        g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nSucc;
1,228✔
609
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
1,228✔
610
            // "yes" need add fail cnt
611
            g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nFail;
276✔
612
            g_queryInfo.superQueryInfo.totalFail    += pThreadInfo->nFail;
276✔
613
        }
614

615
        // close conn
616
        closeQueryConn(pThreadInfo, iface);
1,228✔
617
    }
618
    int64_t end = toolsGetTimestampUs();
439✔
619

620
    if (needExit) {
439✔
621
        goto OVER;
×
622
    }
623

624
    // total show
625
    totalChildQuery(threadInfos, threadCnt, end - start, NULL);
439✔
626

627
    ret = 0;
439✔
628

629
OVER:
439✔
630
    tmfree((char *)pidsOfSub);
439✔
631
    tmfree((char *)threadInfos);
439✔
632

633
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
3,341✔
634
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
2,902✔
635
    }
636
    tmfree(g_queryInfo.superQueryInfo.childTblName);
439✔
637
    return ret;
439✔
638
}
639

640
//
641
// specQuery
642
//
643
static int specQuery(uint16_t iface, char* dbName) {
608✔
644
    int ret = -1;
608✔
645
    pthread_t    *pids = NULL;
608✔
646
    qThreadInfo *infos = NULL;
608✔
647
    int    nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
608✔
648
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
608✔
649
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
608✔
650
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
608✔
651

652
    // check invaid
653
    if(nSqlCount == 0 || nConcurrent == 0 ) {
608✔
654
        if(nSqlCount == 0)
×
655
           warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
×
656
        if(nConcurrent == 0)
×
657
           warnPrint("nConcurrent is %d , specified_table_query->nConcurrent is zero. \n", nConcurrent);
×
658
        return 0;
×
659
    }
660

661
    // malloc threads memory
662
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
608✔
663
    infos = benchCalloc(1, nConcurrent * sizeof(qThreadInfo), false);
608✔
664

665
    tools_cJSON *root = NULL;
608✔
666
    tools_cJSON *result_array = NULL;
608✔
667
    if (g_arguments->output_json_file) {
608✔
668
       root = tools_cJSON_CreateObject();
59✔
669
       if (root != NULL) {
59✔
670
            result_array = tools_cJSON_CreateArray();
59✔
671
            tools_cJSON_AddItemToObject(root, "results", result_array);
59✔
672
       }
673
    }
674

675
    for (uint64_t i = 0; i < nSqlCount; i++) {
2,336✔
676
        if( g_arguments->terminate ) {
1,807✔
677
            break;
×
678
        }
679

680
        // reset
681
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
1,807✔
682
        memset(infos, 0, nConcurrent * sizeof(qThreadInfo));
1,807✔
683

684
        // get execute sql
685
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
1,807✔
686

687
        // create threads
688
        int threadCnt = 0;
1,807✔
689
        for (int j = 0; j < nConcurrent; j++) {
6,696✔
690
           qThreadInfo *pThreadInfo = infos + j;
4,889✔
691
           pThreadInfo->threadID = i * nConcurrent + j;
4,889✔
692
           pThreadInfo->querySeq = i;
4,889✔
693
           pThreadInfo->dbName = dbName;
4,889✔
694

695
           // create conn
696
           if (initQueryConn(pThreadInfo, iface)) {
4,889✔
697
               break;
×
698
           }
699

700
           int code = pthread_create(pids + j, NULL, specQueryThread, pThreadInfo);
4,889✔
701
           if (code != 0) {
4,889✔
702
               errorPrint("failed specQueryThread create. error code =%d \n", code);
×
703
               break;
×
704
           }
705
           threadCnt++;
4,889✔
706
        }
707

708
        bool needExit = false;
1,807✔
709
        // if failed, set termainte flag true like ctrl+c exit
710
        if (threadCnt != nConcurrent  ) {
1,807✔
711
            needExit = true;
×
712
            g_arguments->terminate = true;
×
713
        }
714

715
        int64_t start = toolsGetTimestampUs();
1,807✔
716
        // wait threads execute finished one by one
717
        for (int j = 0; j < threadCnt ; j++) {
6,696✔
718
           pthread_join(pids[j], NULL);
4,889✔
719
           qThreadInfo *pThreadInfo = infos + j;
4,889✔
720
           closeQueryConn(pThreadInfo, iface);
4,889✔
721

722
           // need exit in loop
723
           if (needExit) {
4,889✔
724
                // free BArray
725
                benchArrayDestroy(pThreadInfo->query_delay_list);
×
726
                pThreadInfo->query_delay_list = NULL;
×
727
           }
728
        }
729
        int64_t spend = toolsGetTimestampUs() - start;
1,807✔
730
        if(spend == 0) {
1,807✔
731
            // avoid xx/spend expr throw error
732
            spend = 1;
×
733
        }
734

735
        // create
736
        if (needExit) {
1,807✔
737
            errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
738
            goto OVER;
×
739
        }
740

741
        //
742
        // show QPS and P90 ...
743
        //
744
        uint64_t n = 0;
1,807✔
745
        double  total_delays = 0.0;
1,807✔
746
        uint64_t totalQueried = 0;
1,807✔
747
        uint64_t totalFail    = 0;
1,807✔
748
        for (int j = 0; j < threadCnt; j++) {
6,696✔
749
           qThreadInfo *pThreadInfo = infos + j;
4,889✔
750
           if(pThreadInfo->query_delay_list == NULL) {
4,889✔
751
                continue;;
×
752
           }
753

754
           // total one sql
755
           for (uint64_t k = 0; k < pThreadInfo->query_delay_list->size; k++) {
52,981✔
756
                int64_t * delay = benchArrayGet(pThreadInfo->query_delay_list, k);
48,092✔
757
                sql->delay_list[n++] = *delay;
48,092✔
758
                total_delays += *delay;
48,092✔
759
           }
760

761
           // total queries
762
           totalQueried += pThreadInfo->nSucc;
4,889✔
763
           if (g_arguments->continueIfFail == YES_IF_FAILED) {
4,889✔
764
                totalQueried += pThreadInfo->nFail;
576✔
765
                totalFail    += pThreadInfo->nFail;
576✔
766
           }
767

768
           // free BArray query_delay_list
769
           benchArrayDestroy(pThreadInfo->query_delay_list);
4,889✔
770
           pThreadInfo->query_delay_list = NULL;
4,889✔
771
        }
772

773
        // appand current sql
774
        g_queryInfo.specifiedQueryInfo.totalQueried += totalQueried;
1,807✔
775
        g_queryInfo.specifiedQueryInfo.totalFail    += totalFail;
1,807✔
776

777
        // succ is zero
778
        if(totalQueried == 0 || n == 0) {
1,807✔
779
            errorPrint("%s", "succ queries count is zero.\n");
79✔
780
            goto OVER;
79✔
781
        }
782

783
        qsort(sql->delay_list, n, sizeof(*sql->delay_list), compare);
1,728✔
784

785
        double time_cost = spend / 1E6;
1,728✔
786
        double qps = totalQueried / time_cost;
1,728✔
787
        double avgDelay = total_delays / n / 1E6;
1,728✔
788
        double minDelay = sql->delay_list[0] / 1E6;
1,728✔
789
        double maxDelay = sql->delay_list[n - 1] / 1E6;
1,728✔
790
        double p90 = sql->delay_list[(uint64_t)(n * 0.90)] / 1E6;
1,728✔
791
        double p95 = sql->delay_list[(uint64_t)(n * 0.95)] / 1E6;
1,728✔
792
        double p99 = sql->delay_list[(uint64_t)(n * 0.99)] / 1E6;
1,728✔
793

794
        int32_t bufLen = strlen(sql->command) + 512;
1,728✔
795
        char * buf = benchCalloc(bufLen, sizeof(char), false);
1,728✔
796
        (void)snprintf(buf , bufLen, "complete query with %d threads and %" PRIu64 " "
1,728✔
797
                             "sql %"PRIu64" spend %.6fs QPS: %.3f "
798
                             "query delay "
799
                             "avg: %.6fs "
800
                             "min: %.6fs "
801
                             "max: %.6fs "
802
                             "p90: %.6fs "
803
                             "p95: %.6fs "
804
                             "p99: %.6fs "
805
                             "SQL command: %s \n",
806
                             threadCnt, totalQueried,
807
                             i + 1, time_cost, qps,
808
                             avgDelay, minDelay, maxDelay, p90, p95, p99,
809
                             sql->command);
810

811
        if (result_array) {
1,728✔
812
            tools_cJSON *sqlResult = tools_cJSON_CreateObject();
118✔
813
            tools_cJSON_AddNumberToObject(sqlResult, "threads", threadCnt);
118✔
814
            tools_cJSON_AddNumberToObject(sqlResult, "total_queries", totalQueried);
118✔
815
            tools_cJSON_AddNumberToObject(sqlResult, "time_cost", time_cost);
118✔
816
            tools_cJSON_AddNumberToObject(sqlResult, "qps", qps);
118✔
817
            tools_cJSON_AddNumberToObject(sqlResult, "avg", avgDelay);
118✔
818
            tools_cJSON_AddNumberToObject(sqlResult, "min", minDelay);
118✔
819
            tools_cJSON_AddNumberToObject(sqlResult, "max", maxDelay);
118✔
820
            tools_cJSON_AddNumberToObject(sqlResult, "p90", p90);
118✔
821
            tools_cJSON_AddNumberToObject(sqlResult, "p95", p95);
118✔
822
            tools_cJSON_AddNumberToObject(sqlResult, "p99", p99);
118✔
823
            tools_cJSON_AddItemToArray(result_array, sqlResult);
118✔
824
        }
825

826
        infoPrintNoTimestamp("%s", buf);
1,728✔
827
        infoPrintNoTimestampToFile("%s", buf);
1,728✔
828
        tmfree(buf);
1,728✔
829
    }
830

831
    if (root) {
529✔
832
        char *jsonStr = tools_cJSON_PrintUnformatted(root);
59✔
833
        if (jsonStr) {
59✔
834
            FILE *fp = fopen(g_arguments->output_json_file, "w");
59✔
835
            if (fp) {
59✔
836
                fprintf(fp, "%s\n", jsonStr);
59✔
837
                fclose(fp);
59✔
838
            } else {
839
                errorPrint("Failed to open output JSON file, file name %s\n",
×
840
                    g_arguments->output_json_file);
841
            }
842

843
            free(jsonStr);
59✔
844
        }
845
        tools_cJSON_Delete(root);
59✔
846
    }
847

848
    ret = 0;
529✔
849

850
OVER:
608✔
851
    tmfree((char *)pids);
608✔
852
    tmfree((char *)infos);
608✔
853

854
    // free specialQueryInfo
855
    freeSpecialQueryInfo();
608✔
856
    return ret;
608✔
857
}
858

859
//
860
// specQueryMix
861
//
862
static int specQueryMix(uint16_t iface, char* dbName) {
143✔
863
    // init
864
    int ret            = -1;
143✔
865
    int nConcurrent    = g_queryInfo.specifiedQueryInfo.concurrent;
143✔
866
    pthread_t * pids   = benchCalloc(nConcurrent, sizeof(pthread_t), true);
143✔
867
    qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true);
143✔
868

869
    // concurent calc
870
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
143✔
871
    int start_sql     = 0;
143✔
872
    int a             = total_sql_num / nConcurrent;
143✔
873
    if (a < 1) {
143✔
874
        warnPrint("sqls num:%d < concurent:%d, so set concurrent to %d\n", total_sql_num, nConcurrent, nConcurrent);
83✔
875
        nConcurrent = total_sql_num;
83✔
876
        a = 1;
83✔
877
    }
878
    int b = 0;
143✔
879
    if (nConcurrent != 0) {
143✔
880
        b = total_sql_num % nConcurrent;
143✔
881
    }
882

883
    //
884
    // running
885
    //
886
    int threadCnt = 0;
143✔
887
    for (int i = 0; i < nConcurrent; ++i) {
429✔
888
        qThreadInfo *pThreadInfo = infos + i;
286✔
889
        pThreadInfo->threadID    = i;
286✔
890
        pThreadInfo->start_sql   = start_sql;
286✔
891
        pThreadInfo->end_sql     = i < b ? start_sql + a : start_sql + a - 1;
286✔
892
        start_sql = pThreadInfo->end_sql + 1;
286✔
893
        pThreadInfo->total_delay = 0;
286✔
894
        pThreadInfo->dbName = dbName;
286✔
895

896
        // create conn
897
        if (initQueryConn(pThreadInfo, iface)){
286✔
898
            break;
×
899
        }
900
        // main run
901
        int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo);
286✔
902
        if (code != 0) {
286✔
903
            errorPrint("failed specQueryMixThread create. error code =%d \n", code);
×
904
            break;
×
905
        }
906

907
        threadCnt ++;
286✔
908
    }
909

910
    bool needExit = false;
143✔
911
    // if failed, set termainte flag true like ctrl+c exit
912
    if (threadCnt != nConcurrent) {
143✔
913
        needExit = true;
×
914
        g_arguments->terminate = true;
×
915
    }
916

917
    // reset total
918
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
143✔
919
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
143✔
920

921
    int64_t start = toolsGetTimestampUs();
143✔
922
    for (int i = 0; i < threadCnt; ++i) {
429✔
923
        pthread_join(pids[i], NULL);
286✔
924
        qThreadInfo *pThreadInfo = infos + i;
286✔
925
        closeQueryConn(pThreadInfo, iface);
286✔
926

927
        // total queries
928
        g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc;
286✔
929
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
286✔
930
            // yes need add failed count
931
            g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail;
120✔
932
            g_queryInfo.specifiedQueryInfo.totalFail    += pThreadInfo->nFail;
120✔
933
        }
934

935
        // destory
936
        if (needExit) {
286✔
937
            benchArrayDestroy(pThreadInfo->query_delay_list);
×
938
            pThreadInfo->query_delay_list = NULL;
×
939
        }
940
    }
941
    int64_t end = toolsGetTimestampUs();
143✔
942

943
    // create
944
    if (needExit) {
143✔
945
        errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
946
        goto OVER;
×
947
    }
948

949
    // statistic
950
    totalChildQuery(infos, threadCnt, end - start, NULL);
143✔
951
    ret = 0;
143✔
952

953
OVER:
143✔
954
    tmfree(pids);
143✔
955
    tmfree(infos);
143✔
956

957
    // free sqls
958
    freeSpecialQueryInfo();
143✔
959

960
    return ret;
143✔
961
}
962

963
void totalBatchQuery(int32_t allSleep, BArray *pDelays) {
58✔
964
    // sort
965
    qsort(pDelays->pData, pDelays->size, pDelays->elemSize, compare);
58✔
966

967
    // total delays
968
    double totalDelays = 0;
58✔
969
    for (size_t i = 0; i < pDelays->size; i++) {
1,508✔
970
        int64_t *delay = benchArrayGet(pDelays, i);
1,450✔
971
        totalDelays   += *delay;
1,450✔
972
    }
973

974
    printf("\n");
58✔
975
    // show sleep times
976
    if (allSleep > 0) {
58✔
977
        infoPrint("All sleep spend: %.3fs\n", (float)allSleep/1000);
58✔
978
    }
979

980
    // show P90 ...
981
    if (pDelays->size) {
58✔
982
        infoPrint(
58✔
983
                "Total delay: "
984
                "min delay: %.6fs, "
985
                "avg delay: %.6fs, "
986
                "p90: %.6fs, "
987
                "p95: %.6fs, "
988
                "p99: %.6fs, "
989
                "max: %.6fs\n",
990
                *(int64_t *)(benchArrayGet(pDelays, 0))/1E6,
991
                (double)totalDelays/pDelays->size/1E6,
992
                *(int64_t *)(benchArrayGet(pDelays,
993
                                    (int32_t)(pDelays->size * 0.9)))/1E6,
994
                *(int64_t *)(benchArrayGet(pDelays,
995
                                    (int32_t)(pDelays->size * 0.95)))/1E6,
996
                *(int64_t *)(benchArrayGet(pDelays,
997
                                    (int32_t)(pDelays->size * 0.99)))/1E6,
998
                *(int64_t *)(benchArrayGet(pDelays,
999
                                    (int32_t)(pDelays->size - 1)))/1E6);
1000
    }
1001
}
58✔
1002

1003
//
1004
// specQuery Mix Batch
1005
//
1006
static int specQueryBatch(uint16_t iface, char* dbName) {
96✔
1007
    // init
1008
    BArray *pDelays    = NULL;
96✔
1009
    int ret            = -1;
96✔
1010
    int nConcurrent    = g_queryInfo.specifiedQueryInfo.concurrent;
96✔
1011
    uint64_t interval  = g_queryInfo.specifiedQueryInfo.queryInterval;
96✔
1012
    pthread_t * pids   = benchCalloc(nConcurrent, sizeof(pthread_t), true);
96✔
1013
    qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true);
96✔
1014
    infoPrint("start batch query, sleep interval:%" PRIu64 "ms query times:%" PRIu64 " thread:%d \n",
96✔
1015
        interval, g_queryInfo.query_times, nConcurrent);
1016

1017
    // concurent calc
1018
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
96✔
1019
    int start_sql     = 0;
96✔
1020
    int a             = total_sql_num / nConcurrent;
96✔
1021
    if (a < 1) {
96✔
1022
        warnPrint("sqls num:%d < concurent:%d, set concurrent %d\n", total_sql_num, nConcurrent, nConcurrent);
×
1023
        nConcurrent = total_sql_num;
×
1024
        a = 1;
×
1025
    }
1026
    int b = 0;
96✔
1027
    if (nConcurrent != 0) {
96✔
1028
        b = total_sql_num % nConcurrent;
96✔
1029
    }
1030

1031
    //
1032
    // connect
1033
    //
1034
    int connCnt = 0;
96✔
1035
    for (int i = 0; i < nConcurrent; ++i) {
386✔
1036
        qThreadInfo *pThreadInfo = infos + i;
328✔
1037
        pThreadInfo->dbName = dbName;
328✔
1038
        // create conn
1039
        if (initQueryConn(pThreadInfo, iface)){
328✔
1040
            ret = -1;
38✔
1041
            goto OVER;
38✔
1042
        }
1043

1044
        connCnt ++;
290✔
1045
    }
1046

1047

1048
    // reset total
1049
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
58✔
1050
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
58✔
1051

1052
    //
1053
    // running
1054
    //
1055
    int threadCnt = 0;
58✔
1056
    int allSleep  = 0;
58✔
1057
    pDelays       = benchArrayInit(10, sizeof(int64_t));
58✔
1058
    for (int m = 0; m < g_queryInfo.query_times; ++m) {
348✔
1059
        // reset
1060
        threadCnt = 0;
290✔
1061
        start_sql = 0;
290✔
1062

1063
        // create thread
1064
        for (int i = 0; i < nConcurrent; ++i) {
1,740✔
1065
            qThreadInfo *pThreadInfo = infos + i;
1,450✔
1066
            pThreadInfo->threadID    = i;
1,450✔
1067
            pThreadInfo->start_sql   = start_sql;
1,450✔
1068
            pThreadInfo->end_sql     = i < b ? start_sql + a : start_sql + a - 1;
1,450✔
1069
            start_sql = pThreadInfo->end_sql + 1;
1,450✔
1070
            pThreadInfo->total_delay = 0;
1,450✔
1071
            // total zero
1072
            pThreadInfo->nSucc = 0;
1,450✔
1073
            pThreadInfo->nFail = 0;
1,450✔
1074

1075
            // main run
1076
            int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo);
1,450✔
1077
            if (code != 0) {
1,450✔
1078
                errorPrint("failed specQueryBatchThread create. error code =%d \n", code);
×
1079
                break;
×
1080
            }
1081

1082
            threadCnt ++;
1,450✔
1083
        }
1084

1085
        bool needExit = false;
290✔
1086
        if (threadCnt != nConcurrent) {
290✔
1087
            // if failed, set termainte flag true like ctrl+c exit
1088
            needExit = true;
×
1089
            g_arguments->terminate = true;
×
1090
        }
1091

1092
        // wait thread finished
1093
        int64_t start = toolsGetTimestampUs();
290✔
1094
        for (int i = 0; i < threadCnt; ++i) {
1,740✔
1095
            pthread_join(pids[i], NULL);
1,450✔
1096
            qThreadInfo *pThreadInfo = infos + i;
1,450✔
1097
            // total queries
1098
            g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc;
1,450✔
1099
            if (g_arguments->continueIfFail == YES_IF_FAILED) {
1,450✔
1100
                // yes need add failed count
1101
                g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail;
×
1102
                g_queryInfo.specifiedQueryInfo.totalFail    += pThreadInfo->nFail;
×
1103
            }
1104

1105
            // destory
1106
            if (needExit) {
1,450✔
1107
                benchArrayDestroy(pThreadInfo->query_delay_list);
×
1108
                pThreadInfo->query_delay_list = NULL;
×
1109
            }
1110
        }
1111
        int64_t end = toolsGetTimestampUs();
290✔
1112

1113
        // create
1114
        if (needExit) {
290✔
1115
            errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
1116
            goto OVER;
×
1117
        }
1118

1119
        // batch total
1120
        printf("\n");
290✔
1121
        totalChildQuery(infos, threadCnt, end - start, pDelays);
290✔
1122

1123
        // show batch total
1124
        int64_t delay = end - start;
290✔
1125
        infoPrint("count:%d execute batch spend: %" PRId64 "ms\n", m + 1, delay/1000);
290✔
1126

1127
        // sleep
1128
        if ( g_queryInfo.specifiedQueryInfo.batchQuery && interval > 0) {
290✔
1129
            allSleep += autoSleep(interval, delay);
290✔
1130
        }
1131

1132
        // check cancel
1133
        if(g_arguments->terminate) {
290✔
1134
            break;
×
1135
        }
1136
    }
1137
    ret = 0;
58✔
1138

1139
    // all total
1140
    totalBatchQuery(allSleep, pDelays);
58✔
1141

1142
OVER:
96✔
1143
    // close conn
1144
    for (int i = 0; i < connCnt; ++i) {
386✔
1145
        qThreadInfo *pThreadInfo = infos + i;
290✔
1146
        closeQueryConn(pThreadInfo, iface);
290✔
1147
    }
1148

1149
    // free threads
1150
    tmfree(pids);
96✔
1151
    tmfree(infos);
96✔
1152

1153
    // free sqls
1154
    freeSpecialQueryInfo();
96✔
1155

1156
    // free delays
1157
    if (pDelays) {
96✔
1158
        benchArrayDestroy(pDelays);
58✔
1159
    }
1160

1161
    return ret;
96✔
1162
}
1163

1164
// total query for end
1165
void totalQuery(int64_t spends) {
1,169✔
1166
    // total QPS
1167
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
1,169✔
1168
        + g_queryInfo.superQueryInfo.totalQueried;
1,169✔
1169

1170
    // error rate
1171
    char errRate[128] = "";
1,169✔
1172
    if(g_arguments->continueIfFail == YES_IF_FAILED) {
1,169✔
1173
        uint64_t totalFail = g_queryInfo.specifiedQueryInfo.totalFail + g_queryInfo.superQueryInfo.totalFail;
268✔
1174
        if (totalQueried > 0) {
268✔
1175
            (void)snprintf(errRate, sizeof(errRate), " ,error %" PRIu64 " (rate:%.3f%%)", totalFail, ((float)totalFail * 100)/totalQueried);
268✔
1176
        }
1177
    }
1178

1179
    // show
1180
    double  tInS = (double)spends / 1000;
1,169✔
1181
    char buf[512] = "";
1,169✔
1182
    (void)snprintf(buf, sizeof(buf),
×
1183
                "Spend %.4f second completed total queries: %" PRIu64
1184
                ", the QPS of all threads: %10.3f%s\n\n",
1185
                tInS, totalQueried, (double)totalQueried / tInS, errRate);
1,169✔
1186
    infoPrint("%s", buf);
1,169✔
1187
    infoPrintToFile("%s", buf);
1,169✔
1188
}
1,169✔
1189

1190
int queryTestProcess() {
1,286✔
1191
    prompt(0);
1,286✔
1192

1193
    // covert addr
1194
    if (g_queryInfo.iface == REST_IFACE) {
1,286✔
1195
        encodeAuthBase64();
320✔
1196
        char *host = g_arguments->host          ? g_arguments->host : DEFAULT_HOST;
320✔
1197
        int   port = g_arguments->port_inputted ? g_arguments->port : DEFAULT_REST_PORT;
320✔
1198
        if (convertHostToServAddr(host,
320✔
1199
                    port,
1200
                    &(g_arguments->serv_addr)) != 0) {
320✔
1201
            errorPrint("%s", "convert host to server address\n");
×
1202
            return -1;
×
1203
        }
1204
    }
1205

1206
    // kill sql for executing seconds over "kill_slow_query_threshold"
1207
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
1,286✔
1208
        int32_t ret = killSlowQuery();
60✔
1209
        if (ret != 0) {
60✔
1210
            return ret;
×
1211
        }
1212
    }
1213

1214
    // fetch child name if super table
1215
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
1,286✔
1216
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
439✔
1217
        int32_t ret = fetchChildTableName(g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
439✔
1218
        if (ret != 0) {
439✔
1219
            errorPrint("fetchChildTableName dbName=%s stb=%s failed.", g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
×
1220
            return -1;
×
1221
        }
1222
    }
1223

1224
    //
1225
    // start running
1226
    //
1227

1228
    uint64_t startTs = toolsGetTimestampMs();
1,286✔
1229
    if(g_queryInfo.specifiedQueryInfo.sqls && g_queryInfo.specifiedQueryInfo.sqls->size > 0) {
1,286✔
1230
        // specified table
1231
        if (g_queryInfo.specifiedQueryInfo.mixed_query) {
847✔
1232
            // mixed
1233
            if(g_queryInfo.specifiedQueryInfo.batchQuery) {
239✔
1234
                if (specQueryBatch(g_queryInfo.iface, g_queryInfo.dbName)) {
96✔
1235
                    return -1;
38✔
1236
                }
1237
            } else {
1238
                if (specQueryMix(g_queryInfo.iface, g_queryInfo.dbName)) {
143✔
1239
                    return -1;
×
1240
                }
1241
            }
1242
        } else {
1243
            // no mixied
1244
            if (specQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
608✔
1245
                return -1;
79✔
1246
            }
1247
        }
1248
    } else if(g_queryInfo.superQueryInfo.sqlCount > 0) {
439✔
1249
        // super table
1250
        if (stbQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
439✔
1251
            return -1;
×
1252
        }
1253
    } else {
1254
        // nothing
1255
        errorPrint("%s\n", "Both 'specified_table_query' and 'super_table_query' sqls is empty.");
×
1256
        return -1;
×
1257
    }
1258

1259
    // total
1260
    totalQuery(toolsGetTimestampMs() - startTs);
1,169✔
1261
    return g_fail ? -1 : 0;
1,169✔
1262
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc