• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5055

17 May 2026 01:15AM UTC coverage: 73.355% (-0.003%) from 73.358%
#5055

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281532 of 383795 relevant lines covered (73.35%)

135557734.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.21
/tools/taosBenchmark/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
// query and get result  record is true to total request
17
int selectAndGetResult(qThreadInfo *pThreadInfo, char *command, bool record) {
451,197✔
18
    int ret = 0;
451,197✔
19

20
    // user cancel
21
    if (g_arguments->terminate) {
451,197✔
22
        return -1;
×
23
    }
24

25
    // execute sql
26
    uint32_t threadID = pThreadInfo->threadID;
451,319✔
27
    char dbName[TSDB_DB_NAME_LEN] = {0};
450,789✔
28
    TOOLS_STRNCPY(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
450,699✔
29

30
    if (g_queryInfo.iface == REST_IFACE) {
450,610✔
31
        int retCode = postProcessSql(command, g_queryInfo.dbName, 0, REST_IFACE,
4,388✔
32
                                   0, g_arguments->port, false,
2,194✔
33
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
2,194✔
34
        if (0 != retCode) {
2,194✔
35
            errorPrint("====restful return fail, threadID[%u]\n",
×
36
                       threadID);
37
            ret = -1;
×
38
        }
39
    } else {
40
        // query
41
        TAOS *taos = pThreadInfo->conn->taos;
449,007✔
42
        int64_t rows  = 0;
449,350✔
43
        TAOS_RES *res = taos_query(taos, command);
449,350✔
44
        int code = taos_errno(res);
449,248✔
45
        if (res == NULL || code) {
449,212✔
46
            // failed query
47
            errorPrint("failed to execute sql:%s, "
592✔
48
                        "code: 0x%08x, reason:%s\n",
49
                        command, code, taos_errstr(res));
50
            ret = -1;
321✔
51
        } else {
52
            // succ query
53
            if (record)
448,620✔
54
                rows = fetchResult(res, pThreadInfo->filePath);
448,605✔
55
        }
56

57
        // free result
58
        if (res) {
449,425✔
59
            taos_free_result(res);
449,425✔
60
        }
61
        debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
449,137✔
62
    }
63

64
    // record count
65
    if (ret ==0) {
450,957✔
66
        // succ
67
        if (record)
450,636✔
68
            pThreadInfo->nSucc ++;
450,662✔
69
    } else {
70
        // fail
71
        if (record)
321✔
72
            pThreadInfo->nFail ++;
321✔
73

74
        // continue option
75
        if (YES_IF_FAILED == g_arguments->continueIfFail) {
321✔
76
            ret = 0; // force continue
×
77
        }
78
    }
79

80
    return ret;
450,296✔
81
}
82

83
// interlligent sleep
84
int32_t autoSleep(uint64_t interval, uint64_t delay ) {
13,648✔
85
    int32_t msleep = 0;
13,648✔
86
    if (delay < interval * 1000) {
13,648✔
87
        msleep = (int32_t)((interval - delay/1000));
1,865✔
88
        infoPrint("do sleep %dms ...\n", msleep);
1,865✔
89
        toolsMsleep(msleep);  // ms
1,865✔
90
        debugPrint("%s\n","do sleep end");
1,865✔
91
    }
92
    return msleep;
13,636✔
93
}
94

95
// reset
96
int32_t resetQueryCache(qThreadInfo* pThreadInfo) {
322✔
97
    // execute sql
98
    if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE", false)) {
322✔
99
        errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
100
        return -1;
×
101
    }
102
    // succ
103
    return 0;
322✔
104
}
105

106

107

108
//
109
//  ---------------------------------  second levle funtion for Thread -----------------------------------
110
//
111

112
// show rela qps
113
int64_t showRealQPS(qThreadInfo* pThreadInfo, int64_t lastPrintTime, int64_t startTs) {
449,777✔
114
    int64_t now = toolsGetTimestampMs();
449,777✔
115
    if (now - lastPrintTime > 10 * 1000) {
447,925✔
116
        // real total
117
        uint64_t totalQueried = pThreadInfo->nSucc;
8,736✔
118
        if(g_arguments->continueIfFail == YES_IF_FAILED) {
11,030✔
119
            totalQueried += pThreadInfo->nFail;
1,800✔
120
        }
121
        infoPrint(
11,052✔
122
            "thread[%d] has currently completed queries: %" PRIu64 ", QPS: %10.3f\n",
123
            pThreadInfo->threadID, totalQueried,
124
            (double)(totalQueried / ((now - startTs) / 1000.0)));
125
        return now;
11,052✔
126
    } else {
127
        return lastPrintTime;
439,340✔
128
    }
129
}
130

131
// spec query mixed thread
132
static void *specQueryMixThread(void *sarg) {
2,208✔
133
    qThreadInfo *pThreadInfo = (qThreadInfo*)sarg;
2,208✔
134
#ifdef LINUX
135
    prctl(PR_SET_NAME, "specQueryMixThread");
2,208✔
136
#endif
137
    // use db
138
    if (g_queryInfo.dbName) {
2,208✔
139
        if (pThreadInfo->conn &&
2,208✔
140
            pThreadInfo->conn->taos &&
4,416✔
141
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
2,208✔
142
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
143
                return NULL;
×
144
        }
145
    }
146

147
    int64_t st = 0;
2,208✔
148
    int64_t et = 0;
2,208✔
149
    int64_t startTs        = toolsGetTimestampMs();
2,208✔
150
    int64_t lastPrintTime  = startTs;
2,129✔
151
    // batchQuery
152
    bool     batchQuery    = g_queryInfo.specifiedQueryInfo.batchQuery;
2,129✔
153
    uint64_t queryTimes    = batchQuery ? 1 : g_queryInfo.specifiedQueryInfo.queryTimes;
2,129✔
154
    uint64_t interval      = batchQuery ? 0 : g_queryInfo.specifiedQueryInfo.queryInterval;
2,208✔
155

156
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
2,208✔
157
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
4,337✔
158
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
2,208✔
159
        for (uint64_t j = 0; j < queryTimes; ++j) {
20,513✔
160
            // use cancel
161
            if(g_arguments->terminate) {
18,384✔
162
                infoPrint("%s\n", "user cancel , so exit testing.");
×
163
                break;
×
164
            }
165

166
            // reset cache
167
            if (g_queryInfo.reset_query_cache) {
18,384✔
168
                if (resetQueryCache(pThreadInfo)) {
×
169
                    errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
170
                    return NULL;
×
171
                }
172
            }
173

174
            // execute sql
175
            st = toolsGetTimestampUs();
18,384✔
176
            int ret = selectAndGetResult(pThreadInfo, sql->command, true);
18,305✔
177
            if (ret) {
18,384✔
178
                g_fail = true;
×
179
                errorPrint("failed call mix selectAndGetResult, i=%d j=%" PRIu64 "", i, j);
×
180
                return NULL;
×
181
            }
182
            et = toolsGetTimestampUs();
18,384✔
183

184
            // sleep
185
            if (interval > 0) {
18,384✔
186
                autoSleep(interval, et - st);
204✔
187
            }
188

189
            // delay
190
            if (ret == 0) {
18,384✔
191
                int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
18,384✔
192
                *delay = et - st;
18,384✔
193
                debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, *delay);
18,384✔
194

195
                pThreadInfo->total_delay += *delay;
18,384✔
196
                if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
18,384✔
197
                    tmfree(delay);
×
198
                }
199
            }
200

201
            // real show
202
            lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
18,384✔
203
        }
204
    }
205

206
    return NULL;
2,208✔
207
}
208

209
// spec query thread
210
static void *specQueryThread(void *sarg) {
6,799✔
211
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
6,799✔
212
#ifdef LINUX
213
    prctl(PR_SET_NAME, "specQueryThread");
6,799✔
214
#endif
215
    uint64_t st = 0;
6,799✔
216
    uint64_t et = 0;
6,799✔
217
    int32_t  index = 0;
6,799✔
218

219
    // use db
220
    if (g_queryInfo.dbName) {
6,799✔
221
        if (pThreadInfo->conn &&
6,799✔
222
            pThreadInfo->conn->taos &&
12,012✔
223
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
6,006✔
224
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
225
                return NULL;
×
226
        }
227
    }
228

229
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
6,799✔
230
    uint64_t  interval   = g_queryInfo.specifiedQueryInfo.queryInterval;
6,799✔
231
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
6,799✔
232

233
    uint64_t  startTs       = toolsGetTimestampMs();
6,799✔
234
    uint64_t  lastPrintTime = startTs;
6,760✔
235

236
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, pThreadInfo->querySeq);
6,760✔
237

238
    if (sql->result[0] != '\0') {
6,799✔
239
        (void)snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
12,188✔
240
                sql->result, pThreadInfo->threadID);
6,094✔
241
    }
242

243
    while (index < (int64_t)queryTimes) {
67,618✔
244
        // use cancel
245
        if(g_arguments->terminate) {
61,196✔
246
            infoPrint("thread[%d] user cancel , so exit testing.\n", pThreadInfo->threadID);
×
247
            break;
×
248
        }
249

250
        // reset cache
251
        if (g_queryInfo.reset_query_cache) {
62,050✔
252
            if (resetQueryCache(pThreadInfo)) {
243✔
253
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
254
                return NULL;
×
255
            }
256
        }
257

258
        // execute sql
259
        st = toolsGetTimestampUs();
62,050✔
260
        int ret = selectAndGetResult(pThreadInfo, sql->command, true);
61,939✔
261
        if (ret) {
62,013✔
262
            g_fail = true;
321✔
263
            errorPrint("failed call spec selectAndGetResult, index=%d\n", index);
321✔
264
            break;
321✔
265
        }
266
        et = toolsGetTimestampUs();
61,692✔
267

268
        // sleep
269
        if (interval > 0) {
61,617✔
270
            autoSleep(interval, et - st);
9,419✔
271
        }
272

273

274
        uint64_t delay = et - st;
61,517✔
275
        debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
60,151✔
276

277
        if (ret == 0) {
61,644✔
278
            // only succ add delay list
279
            benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
61,644✔
280
            pThreadInfo->total_delay += delay;
61,051✔
281
        }
282
        index++;
61,154✔
283

284
        // real show
285
        lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
61,154✔
286
    }
287

288
    return NULL;
6,687✔
289
}
290

291
// super table query thread
292
static void *stbQueryThread(void *sarg) {
2,080✔
293
    char *sqlstr = benchCalloc(1, TOOLS_MAX_ALLOWED_SQL_LEN, false);
2,080✔
294
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
2,080✔
295
#ifdef LINUX
296
    prctl(PR_SET_NAME, "stbQueryThread");
2,080✔
297
#endif
298

299
    uint64_t st = 0;
2,080✔
300
    uint64_t et = 0;
2,080✔
301

302
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
2,080✔
303
    uint64_t interval   = g_queryInfo.superQueryInfo.queryInterval;
2,080✔
304
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(uint64_t));
2,080✔
305

306
    uint64_t startTs = toolsGetTimestampMs();
2,080✔
307
    uint64_t lastPrintTime = startTs;
2,080✔
308
    while (queryTimes--) {
25,832✔
309
        // use cancel
310
        if(g_arguments->terminate) {
23,752✔
311
            infoPrint("%s\n", "user cancel , so exit testing.");
×
312
            break;
×
313
        }
314

315
        // reset cache
316
        if (g_queryInfo.reset_query_cache) {
23,752✔
317
            if (resetQueryCache(pThreadInfo)) {
79✔
318
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
319
                return NULL;
×
320
            }
321
        }
322

323
        // execute
324
        st = toolsGetTimestampMs();
23,752✔
325
        // for each table
326
        for (int i = (int)pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
96,699✔
327
            // use cancel
328
            if(g_arguments->terminate) {
72,947✔
329
                infoPrint("%s\n", "user cancel , so exit testing.");
×
330
                break;
×
331
            }
332

333
            // for each sql
334
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
443,092✔
335
                memset(sqlstr, 0, TOOLS_MAX_ALLOWED_SQL_LEN);
369,660✔
336
                // use cancel
337
                if(g_arguments->terminate) {
369,660✔
338
                    infoPrint("%s\n", "user cancel , so exit testing.");
×
339
                    break;
×
340
                }
341

342
                // get real child name sql
343
                if (replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i)) {
370,614✔
344
                    // fault
345
                    tmfree(sqlstr);
×
346
                    return NULL;
×
347
                }
348

349
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
370,558✔
350
                    (void)snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
621,058✔
351
                            g_queryInfo.superQueryInfo.result[j],
310,674✔
352
                            pThreadInfo->threadID);
353
                }
354

355
                // execute sql
356
                uint64_t s = toolsGetTimestampUs();
370,668✔
357
                int ret = selectAndGetResult(pThreadInfo, sqlstr, true);
370,556✔
358
                if (ret) {
370,394✔
359
                    // found error
360
                    errorPrint("failed call stb selectAndGetResult, i=%d j=%d\n", i, j);
×
361
                    g_fail = true;
×
362
                    tmfree(sqlstr);
×
363
                    return NULL;
×
364
                }
365
                uint64_t delay = toolsGetTimestampUs() - s;
370,394✔
366
                debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
369,435✔
367
                if (ret == 0) {
370,078✔
368
                    // only succ add delay list
369
                    benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
370,078✔
370
                    pThreadInfo->total_delay += delay;
369,274✔
371
                }
372

373
                // show real QPS
374
                lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
369,572✔
375
            }
376
        }
377
        et = toolsGetTimestampMs();
23,819✔
378

379
        // sleep
380
        if (interval > 0) {
23,752✔
381
            autoSleep(interval, et - st);
3,665✔
382
        }
383

384
    }
385
    tmfree(sqlstr);
2,080✔
386

387
    return NULL;
2,080✔
388
}
389

390
//
391
// ---------------------------------  firse level function ------------------------------
392
//
393

394
void totalChildQuery(qThreadInfo* infos, int threadCnt, int64_t spend, BArray *pDelays) {
1,221✔
395
    // valid check
396
    if (infos == NULL || threadCnt == 0) {
1,221✔
397
        return ;
×
398
    }
399

400
    // statistic
401
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
1,221✔
402
    double total_delays = 0;
1,221✔
403

404
    // clear
405
    for (int i = 0; i < threadCnt; ++i) {
5,509✔
406
        qThreadInfo * pThreadInfo = infos + i;
4,288✔
407
        if(pThreadInfo->query_delay_list == NULL) {
4,288✔
408
            continue;;
×
409
        }
410

411
        // append delay
412
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
4,288✔
413
                pThreadInfo->query_delay_list->size, false);
4,288✔
414
        total_delays += pThreadInfo->total_delay;
4,288✔
415

416
        // free delay
417
        benchArrayDestroy(pThreadInfo->query_delay_list);
4,288✔
418
        pThreadInfo->query_delay_list = NULL;
4,288✔
419

420
    }
421

422
    // succ is zero
423
    if (delay_list->size == 0) {
1,221✔
424
        errorPrint("%s", "succ queries count is zero.\n");
×
425
        benchArrayDestroy(delay_list);
×
426
        return ;
×
427
    }
428

429

430
    // sort
431
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
1,221✔
432

433
    size_t totalQueried = delay_list->size;
1,221✔
434
    double time_cost = spend / 1E6;
1,221✔
435
    double qps = (time_cost > 0) ? (totalQueried / time_cost) : 0.0;
1,221✔
436
    double avgDelay = (double)total_delays/delay_list->size/1E6;
1,221✔
437
    double minDelay = *(int64_t *)(benchArrayGet(delay_list, 0))/1E6;
1,221✔
438
    double maxDelay = *(int64_t *)(benchArrayGet(delay_list, (int32_t)(delay_list->size - 1)))/1E6;
1,221✔
439
    double p90      = *(int64_t *)(benchArrayGet(delay_list, (int32_t)(delay_list->size * 0.90)))/1E6;
1,221✔
440
    double p95      = *(int64_t *)(benchArrayGet(delay_list, (int32_t)(delay_list->size * 0.95)))/1E6;
1,221✔
441
    double p99      = *(int64_t *)(benchArrayGet(delay_list, (int32_t)(delay_list->size * 0.99)))/1E6;
1,221✔
442

443
    // show delay min max
444
    if (delay_list->size) {
1,221✔
445
        infoPrint(
1,221✔
446
                "spend %.6fs using "
447
                "%d threads complete query %zu times,  "
448
                "min delay: %.6fs, "
449
                "avg delay: %.6fs, "
450
                "p90: %.6fs, "
451
                "p95: %.6fs, "
452
                "p99: %.6fs, "
453
                "max: %.6fs\n",
454
                time_cost,
455
                threadCnt, totalQueried,
456
                minDelay,
457
                avgDelay,
458
                p90,
459
                p95,
460
                p99,
461
                maxDelay);
462
    }
463

464
    // output json for super table query
465
    if (g_queryInfo.superQueryInfo.sqlCount > 0) {
1,221✔
466
        tools_cJSON *root = NULL;
657✔
467
        tools_cJSON *result_array = NULL;
657✔
468
        if (g_arguments->output_json_file) {
657✔
469
            root = tools_cJSON_CreateObject();
×
470
            if (root != NULL) {
×
471
                result_array = tools_cJSON_CreateArray();
×
472
                if (result_array != NULL) {
×
473
                    tools_cJSON_AddItemToObject(root, "results", result_array);
×
474
                } else {
475
                    errorPrint("Failed to create result_array JSON object\n");
×
476
                    tools_cJSON_Delete(root);
×
477
                    root = NULL;
×
478
                }
479
            }
480
        }
481

482
        if (result_array) {
657✔
483
            tools_cJSON *sqlResult = tools_cJSON_CreateObject();
×
484
            if (sqlResult) {
×
485
                tools_cJSON_AddNumberToObject(sqlResult, "threads", threadCnt);
×
486
                tools_cJSON_AddNumberToObject(sqlResult, "total_queries", totalQueried);
×
487
                tools_cJSON_AddNumberToObject(sqlResult, "time_cost", time_cost);
×
488
                tools_cJSON_AddNumberToObject(sqlResult, "qps", qps);
×
489
                tools_cJSON_AddNumberToObject(sqlResult, "avg", avgDelay);
×
490
                tools_cJSON_AddNumberToObject(sqlResult, "min", minDelay);
×
491
                tools_cJSON_AddNumberToObject(sqlResult, "max", maxDelay);
×
492
                tools_cJSON_AddNumberToObject(sqlResult, "p90", p90);
×
493
                tools_cJSON_AddNumberToObject(sqlResult, "p95", p95);
×
494
                tools_cJSON_AddNumberToObject(sqlResult, "p99", p99);
×
495
                tools_cJSON_AddItemToArray(result_array, sqlResult);
×
496
            } else {
497
                errorPrint("Failed to create JSON object for SQL result.\n");
×
498
            }
499
        }
500

501
        if (root) {
657✔
502
            char *jsonStr = tools_cJSON_PrintUnformatted(root);
×
503
            if (jsonStr) {
×
504
                FILE *fp = fopen(g_arguments->output_json_file, "w");
×
505
                if (fp) {
×
506
                    fprintf(fp, "%s\n", jsonStr);
×
507
                    fclose(fp);
×
508
                } else {
509
                    errorPrint("Failed to open output JSON file, file name %s\n",
×
510
                        g_arguments->output_json_file);
511
                }
512

513
                free(jsonStr);
×
514
                jsonStr = NULL;
×
515
            }
516
            tools_cJSON_Delete(root);
×
517
            root = NULL;
×
518
        }
519
    }
520

521
    // copy to another
522
    if (pDelays) {
1,221✔
523
        benchArrayAddBatch(pDelays, delay_list->pData, delay_list->size, false);
360✔
524
    }
525
    benchArrayDestroy(delay_list);
1,221✔
526
}
527

528
//
529
// super table query
530
//
531
static int stbQuery(uint16_t iface, char* dbName) {
657✔
532
    int ret = -1;
657✔
533
    pthread_t * pidsOfSub   = NULL;
657✔
534
    qThreadInfo *threadInfos = NULL;
657✔
535
    g_queryInfo.superQueryInfo.totalQueried = 0;
657✔
536
    g_queryInfo.superQueryInfo.totalFail    = 0;
657✔
537

538
    // check
539
    if ((g_queryInfo.superQueryInfo.sqlCount == 0)
657✔
540
        || (g_queryInfo.superQueryInfo.threadCnt == 0)) {
657✔
541
        return 0;
×
542
    }
543

544
    // malloc
545
    pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
657✔
546
                            *sizeof(pthread_t),
547
                            false);
548
    threadInfos = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
657✔
549
                                *sizeof(qThreadInfo), false);
550

551
    int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
657✔
552
    int nConcurrent = g_queryInfo.superQueryInfo.threadCnt;
657✔
553

554
    int64_t a = ntables / nConcurrent;
657✔
555
    if (a < 1) {
657✔
556
        nConcurrent = (int)ntables;
114✔
557
        a = 1;
114✔
558
    }
559

560
    int64_t b = 0;
657✔
561
    if (nConcurrent != 0) {
657✔
562
        b = ntables % nConcurrent;
657✔
563
    }
564

565
    uint64_t tableFrom = 0;
657✔
566
    int threadCnt = 0;
657✔
567
    for (int i = 0; i < nConcurrent; i++) {
2,737✔
568
        qThreadInfo *pThreadInfo = threadInfos + i;
2,080✔
569
        pThreadInfo->dbName = dbName;
2,080✔
570
        pThreadInfo->threadID = i;
2,080✔
571
        pThreadInfo->start_table_from = tableFrom;
2,080✔
572
        pThreadInfo->ntables = i < b ? a + 1 : a;
2,080✔
573
        pThreadInfo->end_table_to =
2,080✔
574
                i < b ? tableFrom + a : tableFrom + a - 1;
2,080✔
575
        tableFrom = pThreadInfo->end_table_to + 1;
2,080✔
576
        // create conn
577
        if (initQueryConn(pThreadInfo, iface)){
2,080✔
578
            break;
×
579
        }
580
        int code = pthread_create(pidsOfSub + i, NULL, stbQueryThread, pThreadInfo);
2,080✔
581
        if (code != 0) {
2,080✔
582
            errorPrint("failed stbQueryThread create. error code =%d \n", code);
×
583
            break;
×
584
        }
585
        threadCnt ++;
2,080✔
586
    }
587

588
    bool needExit = false;
657✔
589
    // if failed, set termainte flag true like ctrl+c exit
590
    if (threadCnt != nConcurrent  ) {
657✔
591
        needExit = true;
×
592
        g_arguments->terminate = true;
×
593
        goto OVER;
×
594
    }
595

596
    // reset total
597
    g_queryInfo.superQueryInfo.totalQueried = 0;
657✔
598
    g_queryInfo.superQueryInfo.totalFail    = 0;
657✔
599

600
    // real thread count
601
    g_queryInfo.superQueryInfo.threadCnt = threadCnt;
657✔
602
    int64_t start = toolsGetTimestampUs();
657✔
603

604
    for (int i = 0; i < threadCnt; i++) {
2,737✔
605
        pthread_join(pidsOfSub[i], NULL);
2,080✔
606
        qThreadInfo *pThreadInfo = threadInfos + i;
2,080✔
607
        // add succ
608
        g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nSucc;
2,080✔
609
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
2,080✔
610
            // "yes" need add fail cnt
611
            g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nFail;
381✔
612
            g_queryInfo.superQueryInfo.totalFail    += pThreadInfo->nFail;
381✔
613
        }
614

615
        // close conn
616
        closeQueryConn(pThreadInfo, iface);
2,080✔
617
    }
618
    int64_t end = toolsGetTimestampUs();
657✔
619

620
    if (needExit) {
657✔
621
        goto OVER;
×
622
    }
623

624
    // total show
625
    totalChildQuery(threadInfos, threadCnt, end - start, NULL);
657✔
626

627
    ret = 0;
657✔
628

629
OVER:
657✔
630
    tmfree((char *)pidsOfSub);
657✔
631
    tmfree((char *)threadInfos);
657✔
632

633
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
5,223✔
634
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
4,566✔
635
    }
636
    tmfree(g_queryInfo.superQueryInfo.childTblName);
657✔
637
    return ret;
657✔
638
}
639

640
//
641
// specQuery
642
//
643
static int specQuery(uint16_t iface, char* dbName) {
839✔
644
    int ret = -1;
839✔
645
    pthread_t    *pids = NULL;
839✔
646
    qThreadInfo *infos = NULL;
839✔
647
    int    nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
839✔
648
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
839✔
649
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
839✔
650
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
839✔
651

652
    // check invaid
653
    if(nSqlCount == 0 || nConcurrent == 0 ) {
839✔
654
        if(nSqlCount == 0)
×
655
           warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
×
656
        if(nConcurrent == 0)
×
657
           warnPrint("nConcurrent is %d , specified_table_query->nConcurrent is zero. \n", nConcurrent);
×
658
        return 0;
×
659
    }
660

661
    // malloc threads memory
662
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
839✔
663
    infos = benchCalloc(1, nConcurrent * sizeof(qThreadInfo), false);
839✔
664

665
    tools_cJSON *root = NULL;
839✔
666
    tools_cJSON *result_array = NULL;
839✔
667
    if (g_arguments->output_json_file) {
839✔
668
       root = tools_cJSON_CreateObject();
75✔
669
       if (root != NULL) {
75✔
670
            result_array = tools_cJSON_CreateArray();
75✔
671
            tools_cJSON_AddItemToObject(root, "results", result_array);
75✔
672
       }
673
    }
674

675
    for (uint64_t i = 0; i < nSqlCount; i++) {
3,215✔
676
        if( g_arguments->terminate ) {
2,483✔
677
            break;
×
678
        }
679

680
        // reset
681
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
2,483✔
682
        memset(infos, 0, nConcurrent * sizeof(qThreadInfo));
2,483✔
683

684
        // get execute sql
685
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
2,483✔
686

687
        // create threads
688
        int threadCnt = 0;
2,483✔
689
        for (int j = 0; j < nConcurrent; j++) {
9,282✔
690
           qThreadInfo *pThreadInfo = infos + j;
6,799✔
691
           pThreadInfo->threadID = i * nConcurrent + j;
6,799✔
692
           pThreadInfo->querySeq = i;
6,799✔
693
           pThreadInfo->dbName = dbName;
6,799✔
694

695
           // create conn
696
           if (initQueryConn(pThreadInfo, iface)) {
6,799✔
697
               break;
×
698
           }
699

700
           int code = pthread_create(pids + j, NULL, specQueryThread, pThreadInfo);
6,799✔
701
           if (code != 0) {
6,799✔
702
               errorPrint("failed specQueryThread create. error code =%d \n", code);
×
703
               break;
×
704
           }
705
           threadCnt++;
6,799✔
706
        }
707

708
        bool needExit = false;
2,483✔
709
        // if failed, set termainte flag true like ctrl+c exit
710
        if (threadCnt != nConcurrent  ) {
2,483✔
711
            needExit = true;
×
712
            g_arguments->terminate = true;
×
713
        }
714

715
        int64_t start = toolsGetTimestampUs();
2,483✔
716
        // wait threads execute finished one by one
717
        for (int j = 0; j < threadCnt ; j++) {
9,282✔
718
           pthread_join(pids[j], NULL);
6,799✔
719
           qThreadInfo *pThreadInfo = infos + j;
6,799✔
720
           closeQueryConn(pThreadInfo, iface);
6,799✔
721

722
           // need exit in loop
723
           if (needExit) {
6,799✔
724
                // free BArray
725
                benchArrayDestroy(pThreadInfo->query_delay_list);
×
726
                pThreadInfo->query_delay_list = NULL;
×
727
           }
728
        }
729
        int64_t spend = toolsGetTimestampUs() - start;
2,483✔
730
        if(spend == 0) {
2,483✔
731
            // avoid xx/spend expr throw error
732
            spend = 1;
×
733
        }
734

735
        // create
736
        if (needExit) {
2,483✔
737
            errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
738
            goto OVER;
×
739
        }
740

741
        //
742
        // show QPS and P90 ...
743
        //
744
        uint64_t n = 0;
2,483✔
745
        double  total_delays = 0.0;
2,483✔
746
        uint64_t totalQueried = 0;
2,483✔
747
        uint64_t totalFail    = 0;
2,483✔
748
        for (int j = 0; j < threadCnt; j++) {
9,282✔
749
           qThreadInfo *pThreadInfo = infos + j;
6,799✔
750
           if(pThreadInfo->query_delay_list == NULL) {
6,799✔
751
                continue;;
×
752
           }
753

754
           // total one sql
755
           for (uint64_t k = 0; k < pThreadInfo->query_delay_list->size; k++) {
68,717✔
756
                int64_t * delay = benchArrayGet(pThreadInfo->query_delay_list, k);
61,918✔
757
                sql->delay_list[n++] = *delay;
61,918✔
758
                total_delays += *delay;
61,918✔
759
           }
760

761
           // total queries
762
           totalQueried += pThreadInfo->nSucc;
6,799✔
763
           if (g_arguments->continueIfFail == YES_IF_FAILED) {
6,799✔
764
                totalQueried += pThreadInfo->nFail;
724✔
765
                totalFail    += pThreadInfo->nFail;
724✔
766
           }
767

768
           // free BArray query_delay_list
769
           benchArrayDestroy(pThreadInfo->query_delay_list);
6,799✔
770
           pThreadInfo->query_delay_list = NULL;
6,799✔
771
        }
772

773
        // appand current sql
774
        g_queryInfo.specifiedQueryInfo.totalQueried += totalQueried;
2,483✔
775
        g_queryInfo.specifiedQueryInfo.totalFail    += totalFail;
2,483✔
776

777
        // succ is zero
778
        if(totalQueried == 0 || n == 0) {
2,483✔
779
            errorPrint("%s", "succ queries count is zero.\n");
107✔
780
            goto OVER;
107✔
781
        }
782

783
        qsort(sql->delay_list, n, sizeof(*sql->delay_list), compare);
2,376✔
784

785
        double time_cost = spend / 1E6;
2,376✔
786
        double qps = totalQueried / time_cost;
2,376✔
787
        double avgDelay = total_delays / n / 1E6;
2,376✔
788
        double minDelay = sql->delay_list[0] / 1E6;
2,376✔
789
        double maxDelay = sql->delay_list[n - 1] / 1E6;
2,376✔
790
        double p90 = sql->delay_list[(uint64_t)(n * 0.90)] / 1E6;
2,376✔
791
        double p95 = sql->delay_list[(uint64_t)(n * 0.95)] / 1E6;
2,376✔
792
        double p99 = sql->delay_list[(uint64_t)(n * 0.99)] / 1E6;
2,376✔
793

794
        int32_t bufLen = strlen(sql->command) + 512;
2,376✔
795
        char * buf = benchCalloc(bufLen, sizeof(char), false);
2,376✔
796
        (void)snprintf(buf , bufLen, "complete query with %d threads and %" PRIu64 " "
2,376✔
797
                             "sql %"PRIu64" spend %.6fs QPS: %.3f "
798
                             "query delay "
799
                             "avg: %.6fs "
800
                             "min: %.6fs "
801
                             "max: %.6fs "
802
                             "p90: %.6fs "
803
                             "p95: %.6fs "
804
                             "p99: %.6fs "
805
                             "SQL command: %s \n",
806
                             threadCnt, totalQueried,
807
                             i + 1, time_cost, qps,
808
                             avgDelay, minDelay, maxDelay, p90, p95, p99,
809
                             sql->command);
810

811
        if (result_array) {
2,376✔
812
            tools_cJSON *sqlResult = tools_cJSON_CreateObject();
150✔
813
            tools_cJSON_AddNumberToObject(sqlResult, "threads", threadCnt);
150✔
814
            tools_cJSON_AddNumberToObject(sqlResult, "total_queries", totalQueried);
150✔
815
            tools_cJSON_AddNumberToObject(sqlResult, "time_cost", time_cost);
150✔
816
            tools_cJSON_AddNumberToObject(sqlResult, "qps", qps);
150✔
817
            tools_cJSON_AddNumberToObject(sqlResult, "avg", avgDelay);
150✔
818
            tools_cJSON_AddNumberToObject(sqlResult, "min", minDelay);
150✔
819
            tools_cJSON_AddNumberToObject(sqlResult, "max", maxDelay);
150✔
820
            tools_cJSON_AddNumberToObject(sqlResult, "p90", p90);
150✔
821
            tools_cJSON_AddNumberToObject(sqlResult, "p95", p95);
150✔
822
            tools_cJSON_AddNumberToObject(sqlResult, "p99", p99);
150✔
823
            tools_cJSON_AddItemToArray(result_array, sqlResult);
150✔
824
        }
825

826
        infoPrintNoTimestamp("%s", buf);
2,376✔
827
        infoPrintNoTimestampToFile("%s", buf);
2,376✔
828
        tmfree(buf);
2,376✔
829
    }
830

831
    if (root) {
732✔
832
        char *jsonStr = tools_cJSON_PrintUnformatted(root);
75✔
833
        if (jsonStr) {
75✔
834
            FILE *fp = fopen(g_arguments->output_json_file, "w");
75✔
835
            if (fp) {
75✔
836
                fprintf(fp, "%s\n", jsonStr);
75✔
837
                fclose(fp);
75✔
838
            } else {
839
                errorPrint("Failed to open output JSON file, file name %s\n",
×
840
                    g_arguments->output_json_file);
841
            }
842

843
            free(jsonStr);
75✔
844
        }
845
        tools_cJSON_Delete(root);
75✔
846
    }
847

848
    ret = 0;
732✔
849

850
OVER:
839✔
851
    tmfree((char *)pids);
839✔
852
    tmfree((char *)infos);
839✔
853

854
    // free specialQueryInfo
855
    freeSpecialQueryInfo();
839✔
856
    return ret;
839✔
857
}
858

859
//
860
// specQueryMix
861
//
862
static int specQueryMix(uint16_t iface, char* dbName) {
204✔
863
    // init
864
    int ret            = -1;
204✔
865
    int nConcurrent    = g_queryInfo.specifiedQueryInfo.concurrent;
204✔
866
    pthread_t * pids   = benchCalloc(nConcurrent, sizeof(pthread_t), true);
204✔
867
    qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true);
204✔
868

869
    // concurent calc
870
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
204✔
871
    int start_sql     = 0;
204✔
872
    int a             = total_sql_num / nConcurrent;
204✔
873
    if (a < 1) {
204✔
874
        warnPrint("sqls num:%d < concurent:%d, so set concurrent to %d\n", total_sql_num, nConcurrent, nConcurrent);
125✔
875
        nConcurrent = total_sql_num;
125✔
876
        a = 1;
125✔
877
    }
878
    int b = 0;
204✔
879
    if (nConcurrent != 0) {
204✔
880
        b = total_sql_num % nConcurrent;
204✔
881
    }
882

883
    //
884
    // running
885
    //
886
    int threadCnt = 0;
204✔
887
    for (int i = 0; i < nConcurrent; ++i) {
612✔
888
        qThreadInfo *pThreadInfo = infos + i;
408✔
889
        pThreadInfo->threadID    = i;
408✔
890
        pThreadInfo->start_sql   = start_sql;
408✔
891
        pThreadInfo->end_sql     = i < b ? start_sql + a : start_sql + a - 1;
408✔
892
        start_sql = pThreadInfo->end_sql + 1;
408✔
893
        pThreadInfo->total_delay = 0;
408✔
894
        pThreadInfo->dbName = dbName;
408✔
895

896
        // create conn
897
        if (initQueryConn(pThreadInfo, iface)){
408✔
898
            break;
×
899
        }
900
        // main run
901
        int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo);
408✔
902
        if (code != 0) {
408✔
903
            errorPrint("failed specQueryMixThread create. error code =%d \n", code);
×
904
            break;
×
905
        }
906

907
        threadCnt ++;
408✔
908
    }
909

910
    bool needExit = false;
204✔
911
    // if failed, set termainte flag true like ctrl+c exit
912
    if (threadCnt != nConcurrent) {
204✔
913
        needExit = true;
×
914
        g_arguments->terminate = true;
×
915
    }
916

917
    // reset total
918
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
204✔
919
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
204✔
920

921
    int64_t start = toolsGetTimestampUs();
204✔
922
    for (int i = 0; i < threadCnt; ++i) {
612✔
923
        pthread_join(pids[i], NULL);
408✔
924
        qThreadInfo *pThreadInfo = infos + i;
408✔
925
        closeQueryConn(pThreadInfo, iface);
408✔
926

927
        // total queries
928
        g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc;
408✔
929
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
408✔
930
            // yes need add failed count
931
            g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail;
158✔
932
            g_queryInfo.specifiedQueryInfo.totalFail    += pThreadInfo->nFail;
158✔
933
        }
934

935
        // destory
936
        if (needExit) {
408✔
937
            benchArrayDestroy(pThreadInfo->query_delay_list);
×
938
            pThreadInfo->query_delay_list = NULL;
×
939
        }
940
    }
941
    int64_t end = toolsGetTimestampUs();
204✔
942

943
    // create
944
    if (needExit) {
204✔
945
        errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
946
        goto OVER;
×
947
    }
948

949
    // statistic
950
    totalChildQuery(infos, threadCnt, end - start, NULL);
204✔
951
    ret = 0;
204✔
952

953
OVER:
204✔
954
    tmfree(pids);
204✔
955
    tmfree(infos);
204✔
956

957
    // free sqls
958
    freeSpecialQueryInfo();
204✔
959

960
    return ret;
204✔
961
}
962

963
void totalBatchQuery(int32_t allSleep, BArray *pDelays) {
72✔
964
    // sort
965
    qsort(pDelays->pData, pDelays->size, pDelays->elemSize, compare);
72✔
966

967
    // total delays
968
    double totalDelays = 0;
72✔
969
    for (size_t i = 0; i < pDelays->size; i++) {
1,872✔
970
        int64_t *delay = benchArrayGet(pDelays, i);
1,800✔
971
        totalDelays   += *delay;
1,800✔
972
    }
973

974
    printf("\n");
72✔
975
    // show sleep times
976
    if (allSleep > 0) {
72✔
977
        infoPrint("All sleep spend: %.3fs\n", (float)allSleep/1000);
72✔
978
    }
979

980
    // show P90 ...
981
    if (pDelays->size) {
72✔
982
        infoPrint(
72✔
983
                "Total delay: "
984
                "min delay: %.6fs, "
985
                "avg delay: %.6fs, "
986
                "p90: %.6fs, "
987
                "p95: %.6fs, "
988
                "p99: %.6fs, "
989
                "max: %.6fs\n",
990
                *(int64_t *)(benchArrayGet(pDelays, 0))/1E6,
991
                (double)totalDelays/pDelays->size/1E6,
992
                *(int64_t *)(benchArrayGet(pDelays,
993
                                    (int32_t)(pDelays->size * 0.9)))/1E6,
994
                *(int64_t *)(benchArrayGet(pDelays,
995
                                    (int32_t)(pDelays->size * 0.95)))/1E6,
996
                *(int64_t *)(benchArrayGet(pDelays,
997
                                    (int32_t)(pDelays->size * 0.99)))/1E6,
998
                *(int64_t *)(benchArrayGet(pDelays,
999
                                    (int32_t)(pDelays->size - 1)))/1E6);
1000
    }
1001
}
72✔
1002

1003
//
1004
// specQuery Mix Batch
1005
//
1006
static int specQueryBatch(uint16_t iface, char* dbName) {
132✔
1007
    // init
1008
    BArray *pDelays    = NULL;
132✔
1009
    int ret            = -1;
132✔
1010
    int nConcurrent    = g_queryInfo.specifiedQueryInfo.concurrent;
132✔
1011
    uint64_t interval  = g_queryInfo.specifiedQueryInfo.queryInterval;
132✔
1012
    pthread_t * pids   = benchCalloc(nConcurrent, sizeof(pthread_t), true);
132✔
1013
    qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true);
132✔
1014
    infoPrint("start batch query, sleep interval:%" PRIu64 "ms query times:%" PRIu64 " thread:%d \n",
132✔
1015
        interval, g_queryInfo.query_times, nConcurrent);
1016

1017
    // concurent calc
1018
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
132✔
1019
    int start_sql     = 0;
132✔
1020
    int a             = total_sql_num / nConcurrent;
132✔
1021
    if (a < 1) {
132✔
1022
        warnPrint("sqls num:%d < concurent:%d, set concurrent %d\n", total_sql_num, nConcurrent, nConcurrent);
×
1023
        nConcurrent = total_sql_num;
×
1024
        a = 1;
×
1025
    }
1026
    int b = 0;
132✔
1027
    if (nConcurrent != 0) {
132✔
1028
        b = total_sql_num % nConcurrent;
132✔
1029
    }
1030

1031
    //
1032
    // connect
1033
    //
1034
    int connCnt = 0;
132✔
1035
    for (int i = 0; i < nConcurrent; ++i) {
492✔
1036
        qThreadInfo *pThreadInfo = infos + i;
420✔
1037
        pThreadInfo->dbName = dbName;
420✔
1038
        // create conn
1039
        if (initQueryConn(pThreadInfo, iface)){
420✔
1040
            ret = -1;
60✔
1041
            goto OVER;
60✔
1042
        }
1043

1044
        connCnt ++;
360✔
1045
    }
1046

1047

1048
    // reset total
1049
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
72✔
1050
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
72✔
1051

1052
    //
1053
    // running
1054
    //
1055
    int threadCnt = 0;
72✔
1056
    int allSleep  = 0;
72✔
1057
    pDelays       = benchArrayInit(10, sizeof(int64_t));
72✔
1058
    for (int m = 0; m < g_queryInfo.query_times; ++m) {
432✔
1059
        // reset
1060
        threadCnt = 0;
360✔
1061
        start_sql = 0;
360✔
1062

1063
        // create thread
1064
        for (int i = 0; i < nConcurrent; ++i) {
2,160✔
1065
            qThreadInfo *pThreadInfo = infos + i;
1,800✔
1066
            pThreadInfo->threadID    = i;
1,800✔
1067
            pThreadInfo->start_sql   = start_sql;
1,800✔
1068
            pThreadInfo->end_sql     = i < b ? start_sql + a : start_sql + a - 1;
1,800✔
1069
            start_sql = pThreadInfo->end_sql + 1;
1,800✔
1070
            pThreadInfo->total_delay = 0;
1,800✔
1071
            // total zero
1072
            pThreadInfo->nSucc = 0;
1,800✔
1073
            pThreadInfo->nFail = 0;
1,800✔
1074

1075
            // main run
1076
            int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo);
1,800✔
1077
            if (code != 0) {
1,800✔
1078
                errorPrint("failed specQueryBatchThread create. error code =%d \n", code);
×
1079
                break;
×
1080
            }
1081

1082
            threadCnt ++;
1,800✔
1083
        }
1084

1085
        bool needExit = false;
360✔
1086
        if (threadCnt != nConcurrent) {
360✔
1087
            // if failed, set termainte flag true like ctrl+c exit
1088
            needExit = true;
×
1089
            g_arguments->terminate = true;
×
1090
        }
1091

1092
        // wait thread finished
1093
        int64_t start = toolsGetTimestampUs();
360✔
1094
        for (int i = 0; i < threadCnt; ++i) {
2,160✔
1095
            pthread_join(pids[i], NULL);
1,800✔
1096
            qThreadInfo *pThreadInfo = infos + i;
1,800✔
1097
            // total queries
1098
            g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc;
1,800✔
1099
            if (g_arguments->continueIfFail == YES_IF_FAILED) {
1,800✔
1100
                // yes need add failed count
1101
                g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail;
×
1102
                g_queryInfo.specifiedQueryInfo.totalFail    += pThreadInfo->nFail;
×
1103
            }
1104

1105
            // destory
1106
            if (needExit) {
1,800✔
1107
                benchArrayDestroy(pThreadInfo->query_delay_list);
×
1108
                pThreadInfo->query_delay_list = NULL;
×
1109
            }
1110
        }
1111
        int64_t end = toolsGetTimestampUs();
360✔
1112

1113
        // create
1114
        if (needExit) {
360✔
1115
            errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
1116
            goto OVER;
×
1117
        }
1118

1119
        // batch total
1120
        printf("\n");
360✔
1121
        totalChildQuery(infos, threadCnt, end - start, pDelays);
360✔
1122

1123
        // show batch total
1124
        int64_t delay = end - start;
360✔
1125
        infoPrint("count:%d execute batch spend: %" PRId64 "ms\n", m + 1, delay/1000);
360✔
1126

1127
        // sleep
1128
        if ( g_queryInfo.specifiedQueryInfo.batchQuery && interval > 0) {
360✔
1129
            allSleep += autoSleep(interval, delay);
360✔
1130
        }
1131

1132
        // check cancel
1133
        if(g_arguments->terminate) {
360✔
1134
            break;
×
1135
        }
1136
    }
1137
    ret = 0;
72✔
1138

1139
    // all total
1140
    totalBatchQuery(allSleep, pDelays);
72✔
1141

1142
OVER:
132✔
1143
    // close conn
1144
    for (int i = 0; i < connCnt; ++i) {
492✔
1145
        qThreadInfo *pThreadInfo = infos + i;
360✔
1146
        closeQueryConn(pThreadInfo, iface);
360✔
1147
    }
1148

1149
    // free threads
1150
    tmfree(pids);
132✔
1151
    tmfree(infos);
132✔
1152

1153
    // free sqls
1154
    freeSpecialQueryInfo();
132✔
1155

1156
    // free delays
1157
    if (pDelays) {
132✔
1158
        benchArrayDestroy(pDelays);
72✔
1159
    }
1160

1161
    return ret;
132✔
1162
}
1163

1164
// total query for end
1165
void totalQuery(int64_t spends) {
1,665✔
1166
    // total QPS
1167
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
1,665✔
1168
        + g_queryInfo.superQueryInfo.totalQueried;
1,665✔
1169

1170
    // error rate
1171
    char errRate[128] = "";
1,665✔
1172
    if(g_arguments->continueIfFail == YES_IF_FAILED) {
1,665✔
1173
        uint64_t totalFail = g_queryInfo.specifiedQueryInfo.totalFail + g_queryInfo.superQueryInfo.totalFail;
353✔
1174
        if (totalQueried > 0) {
353✔
1175
            (void)snprintf(errRate, sizeof(errRate), " ,error %" PRIu64 " (rate:%.3f%%)", totalFail, ((float)totalFail * 100)/totalQueried);
353✔
1176
        }
1177
    }
1178

1179
    // show
1180
    double  tInS = (double)spends / 1000;
1,665✔
1181
    char buf[512] = "";
1,665✔
1182
    (void)snprintf(buf, sizeof(buf),
×
1183
                "Spend %.4f second completed total queries: %" PRIu64
1184
                ", the QPS of all threads: %10.3f%s\n\n",
1185
                tInS, totalQueried, (double)totalQueried / tInS, errRate);
1,665✔
1186
    infoPrint("%s", buf);
1,665✔
1187
    infoPrintToFile("%s", buf);
1,665✔
1188
}
1,665✔
1189

1190
int queryTestProcess() {
1,832✔
1191
    prompt(0);
1,832✔
1192

1193
    // covert addr
1194
    if (g_queryInfo.iface == REST_IFACE) {
1,832✔
1195
        encodeAuthBase64();
449✔
1196
        char *host = g_arguments->host          ? g_arguments->host : DEFAULT_HOST;
449✔
1197
        int   port = g_arguments->port_inputted ? g_arguments->port : DEFAULT_REST_PORT;
449✔
1198
        if (convertHostToServAddr(host,
449✔
1199
                    port,
1200
                    &(g_arguments->serv_addr)) != 0) {
449✔
1201
            errorPrint("%s", "convert host to server address\n");
×
1202
            return -1;
×
1203
        }
1204
    }
1205

1206
    // kill sql for executing seconds over "kill_slow_query_threshold"
1207
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
1,832✔
1208
        int32_t ret = killSlowQuery();
79✔
1209
        if (ret != 0) {
79✔
1210
            return ret;
×
1211
        }
1212
    }
1213

1214
    // fetch child name if super table
1215
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
1,832✔
1216
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
657✔
1217
        int32_t ret = fetchChildTableName(g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
657✔
1218
        if (ret != 0) {
657✔
1219
            errorPrint("fetchChildTableName dbName=%s stb=%s failed.", g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
×
1220
            return -1;
×
1221
        }
1222
    }
1223

1224
    //
1225
    // start running
1226
    //
1227

1228
    uint64_t startTs = toolsGetTimestampMs();
1,832✔
1229
    if(g_queryInfo.specifiedQueryInfo.sqls && g_queryInfo.specifiedQueryInfo.sqls->size > 0) {
1,832✔
1230
        // specified table
1231
        if (g_queryInfo.specifiedQueryInfo.mixed_query) {
1,175✔
1232
            // mixed
1233
            if(g_queryInfo.specifiedQueryInfo.batchQuery) {
336✔
1234
                if (specQueryBatch(g_queryInfo.iface, g_queryInfo.dbName)) {
132✔
1235
                    return -1;
60✔
1236
                }
1237
            } else {
1238
                if (specQueryMix(g_queryInfo.iface, g_queryInfo.dbName)) {
204✔
1239
                    return -1;
×
1240
                }
1241
            }
1242
        } else {
1243
            // no mixied
1244
            if (specQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
839✔
1245
                return -1;
107✔
1246
            }
1247
        }
1248
    } else if(g_queryInfo.superQueryInfo.sqlCount > 0) {
657✔
1249
        // super table
1250
        if (stbQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
657✔
1251
            return -1;
×
1252
        }
1253
    } else {
1254
        // nothing
1255
        errorPrint("%s\n", "Both 'specified_table_query' and 'super_table_query' sqls is empty.");
×
1256
        return -1;
×
1257
    }
1258

1259
    // total
1260
    totalQuery(toolsGetTimestampMs() - startTs);
1,665✔
1261
    return g_fail ? -1 : 0;
1,665✔
1262
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc