• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3616

19 Feb 2025 11:22AM UTC coverage: 63.315% (+0.4%) from 62.953%
#3616

push

travis-ci

web-flow
Merge pull request #29823 from taosdata/feat/TS-5928

fix:[TS-5928]add consumer parameters

148228 of 300186 branches covered (49.38%)

Branch coverage included in aggregate %.

232358 of 300909 relevant lines covered (77.22%)

17990742.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/tools/shell/src/shellWebsocket.c
1

2
/*
3
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
4
 *
5
 * This program is free software: you can use, redistribute, and/or modify
6
 * it under the terms of the GNU Affero General Public License, version 3
7
 * or later ("AGPL"), as published by the Free Software Foundation.
8
 *
9
 * This program is distributed in the hope that it will be useful, but WITHOUT
10
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
 * FITNESS FOR A PARTICULAR PURPOSE.
12
 *
13
 * You should have received a copy of the GNU Affero General Public License
14
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15
 */
16
#ifdef WEBSOCKET
17
#include <taosws.h>
18
#include <shellInt.h>
19

20
// save current database name
21
char curDBName[128] = ""; // TDB_MAX_DBNAME_LEN is 24, put large
22

23
int shell_conn_ws_server(bool first) {
×
24
  char cuttedDsn[SHELL_WS_DSN_BUFF] = {0};
×
25
  int dsnLen = strlen(shell.args.dsn);
×
26
  snprintf(cuttedDsn,
×
27
           ((dsnLen-SHELL_WS_DSN_MASK) > SHELL_WS_DSN_BUFF)?
28
            SHELL_WS_DSN_BUFF:(dsnLen-SHELL_WS_DSN_MASK),
×
29
           "%s", shell.args.dsn);
30
  fprintf(stdout, "trying to connect %s****** ", cuttedDsn);
×
31
  fflush(stdout);
×
32
  for (int i = 0; i < shell.args.timeout; i++) {
×
33
    if(shell.args.is_bi_mode) {
×
34
      size_t len = strlen(shell.args.dsn);
×
35
      char * dsn = taosMemoryMalloc(len + 32);
×
36
      sprintf(dsn, "%s&conn_mode=1", shell.args.dsn);
×
37
      shell.ws_conn = ws_connect(dsn);
×
38
      taosMemoryFree(dsn);
×
39
    } else {
40
      shell.ws_conn = ws_connect(shell.args.dsn);
×
41
    }
42

43
    if (NULL == shell.ws_conn) {
×
44
      int errNo = ws_errno(NULL);
×
45
      if (0xE001 == errNo) {
×
46
        fprintf(stdout, ".");
×
47
        fflush(stdout);
×
48
        taosMsleep(1000);  // sleep 1 second then try again
×
49
        continue;
×
50
      } else {
51
        fprintf(stderr, "\nfailed to connect %s***, reason: %s\n",
×
52
            cuttedDsn, ws_errstr(NULL));
53
        return -1;
×
54
      }
55
    } else {
56
      break;
×
57
    }
58
  }
59
  if (NULL == shell.ws_conn) {
×
60
    fprintf(stdout, "\n timeout\n");
×
61
    fprintf(stderr, "\nfailed to connect %s***, reason: %s\n",
×
62
      cuttedDsn, ws_errstr(NULL));
63
    return -1;
×
64
  } else {
65
    fprintf(stdout, "\n");
×
66
  }
67
  if (first && shell.args.restful) {
×
68
    fprintf(stdout, "successfully connected to %s\n\n",
×
69
        shell.args.dsn);
70
  } else if (first && shell.args.cloud) {
×
71
    if(shell.args.local) {
×
72
      const char* host = strstr(shell.args.dsn, "@");
×
73
      if(host) {
×
74
        host += 1;
×
75
      } else {
76
        host = shell.args.dsn;
×
77
      }
78
      fprintf(stdout, "successfully connected to %s\n", host);
×
79
    } else {
80
      fprintf(stdout, "successfully connected to service\n");
×
81
    }
82
  }
83
  fflush(stdout);
×
84

85
  // switch to current database if have
86
  if(curDBName[0] !=0) {
×
87
    char command[256];
88
    sprintf(command, "use %s;", curDBName);
×
89
    shellRunSingleCommandWebsocketImp(command);
×
90
  }
91

92
  return 0;
×
93
}
94

95
static int horizontalPrintWebsocket(WS_RES* wres, double* execute_time) {
×
96
  const void* data = NULL;
×
97
  int rows;
98
  ws_fetch_raw_block(wres, &data, &rows);
×
99
  if (wres) {
×
100
    *execute_time += (double)(ws_take_timing(wres)/1E6);
×
101
  }
102
  if (!rows) {
×
103
    return 0;
×
104
  }
105
  int num_fields = ws_field_count(wres);
×
106
  TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
×
107
  int precision = ws_result_precision(wres);
×
108

109
  int width[TSDB_MAX_COLUMNS];
110
  for (int col = 0; col < num_fields; col++) {
×
111
    width[col] = shellCalcColWidth(fields + col, precision);
×
112
  }
113

114
  shellPrintHeader(fields, width, num_fields);
×
115

116
  int numOfRows = 0;
×
117
  do {
118
    uint8_t ty;
119
    uint32_t len;
120
    for (int i = 0; i < rows; i++) {
×
121
      for (int j = 0; j < num_fields; j++) {
×
122
        putchar(' ');
×
123
        const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
×
124
        shellPrintField((const char*)value, fields+j, width[j], len, precision);
×
125
        putchar(' ');
×
126
        putchar('|');
×
127
      }
128
      putchar('\r');
×
129
      putchar('\n');
×
130
    }
131
    numOfRows += rows;
×
132
    ws_fetch_raw_block(wres, &data, &rows);
×
133
  } while (rows && !shell.stop_query);
×
134
  return numOfRows;
×
135
}
136

137
static int verticalPrintWebsocket(WS_RES* wres, double* pexecute_time) {
×
138
  int rows = 0;
×
139
  const void* data = NULL;
×
140
  ws_fetch_raw_block(wres, &data, &rows);
×
141
  if (wres) {
×
142
    *pexecute_time += (double)(ws_take_timing(wres)/1E6);
×
143
  }
144
  if (!rows) {
×
145
    return 0;
×
146
  }
147
  int num_fields = ws_field_count(wres);
×
148
  TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
×
149
  int precision = ws_result_precision(wres);
×
150

151
  int maxColNameLen = 0;
×
152
  for (int col = 0; col < num_fields; col++) {
×
153
    int len = (int)strlen(fields[col].name);
×
154
    if (len > maxColNameLen) {
×
155
      maxColNameLen = len;
×
156
    }
157
  }
158
  int numOfRows = 0;
×
159
  do {
160
    uint8_t ty;
161
    uint32_t len;
162
    for (int i = 0; i < rows; i++) {
×
163
      printf("*************************** %d.row ***************************\n",
×
164
        numOfRows + 1);
165
      for (int j = 0; j < num_fields; j++) {
×
166
        TAOS_FIELD* field = fields + j;
×
167
        int padding = (int)(maxColNameLen - strlen(field->name));
×
168
        printf("%*.s%s: ", padding, " ", field->name);
×
169
        const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
×
170
        shellPrintField((const char*)value, field, 0, len, precision);
×
171
        putchar('\n');
×
172
      }
173
      numOfRows++;
×
174
    }
175
    ws_fetch_raw_block(wres, &data, &rows);
×
176
  } while (rows && !shell.stop_query);
×
177
  return numOfRows;
×
178
}
179

180
static int dumpWebsocketToFile(const char* fname, WS_RES* wres,
×
181
                               double* pexecute_time) {
182
  char fullname[PATH_MAX] = {0};
×
183
  if (taosExpandDir(fname, fullname, PATH_MAX) != 0) {
×
184
    tstrncpy(fullname, fname, PATH_MAX);
×
185
  }
186

187
  TdFilePtr pFile = taosOpenFile(fullname,
×
188
      TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
189
  if (pFile == NULL) {
×
190
    fprintf(stderr, "failed to open file: %s\r\n", fullname);
×
191
    return -1;
×
192
  }
193
  int rows = 0;
×
194
  const void* data = NULL;
×
195
  ws_fetch_raw_block(wres, &data, &rows);
×
196
  if (wres) {
×
197
    *pexecute_time += (double)(ws_take_timing(wres)/1E6);
×
198
  }
199
  if (!rows) {
×
200
    taosCloseFile(&pFile);
×
201
    return 0;
×
202
  }
203
  int numOfRows = 0;
×
204
  TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
×
205
  int num_fields = ws_field_count(wres);
×
206
  int precision = ws_result_precision(wres);
×
207
  for (int col = 0; col < num_fields; col++) {
×
208
    if (col > 0) {
×
209
      taosFprintfFile(pFile, ",");
×
210
    }
211
    taosFprintfFile(pFile, "%s", fields[col].name);
×
212
  }
213
  taosFprintfFile(pFile, "\r\n");
×
214
  do {
215
    uint8_t ty;
216
    uint32_t len;
217
    numOfRows += rows;
×
218
    for (int i = 0; i < rows; i++) {
×
219
      for (int j = 0; j < num_fields; j++) {
×
220
        if (j > 0) {
×
221
          taosFprintfFile(pFile, ",");
×
222
        }
223
        const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
×
224
        shellDumpFieldToFile(pFile, (const char*)value,
×
225
                             fields + j, len, precision);
×
226
      }
227
      taosFprintfFile(pFile, "\r\n");
×
228
    }
229
    ws_fetch_raw_block(wres, &data, &rows);
×
230
  } while (rows && !shell.stop_query);
×
231
  taosCloseFile(&pFile);
×
232
  return numOfRows;
×
233
}
234

235
static int shellDumpWebsocket(WS_RES *wres, char *fname,
×
236
                              int *error_no, bool vertical,
237
                              double* pexecute_time) {
238
  int numOfRows = 0;
×
239
  if (fname != NULL) {
×
240
    numOfRows = dumpWebsocketToFile(fname, wres, pexecute_time);
×
241
  } else if (vertical) {
×
242
    numOfRows = verticalPrintWebsocket(wres, pexecute_time);
×
243
  } else {
244
    numOfRows = horizontalPrintWebsocket(wres, pexecute_time);
×
245
  }
246
  *error_no = ws_errno(wres);
×
247
  return numOfRows;
×
248
}
249

250
char * strendG(const char* pstr);
251
void shellRunSingleCommandWebsocketImp(char *command) {
×
252
  int64_t st, et;
253
  char   *sptr = NULL;
×
254
  char   *cptr = NULL;
×
255
  char   *fname = NULL;
×
256
  bool    printMode = false;
×
257

258
  if ((sptr = strstr(command, ">>")) != NULL) {
×
259
    fname = sptr + 2;
×
260
    while (*fname == ' ') fname++;
×
261
    *sptr = '\0';
×
262

263
    cptr = strstr(fname, ";");
×
264
    if (cptr != NULL) {
×
265
      *cptr = '\0';
×
266
    }
267
  }
268

269
  if ((sptr = strendG(command)) != NULL) {
×
270
    *sptr = '\0';
×
271
    printMode = true;  // When output to a file, the switch does not work.
×
272
  }
273

274
  shell.stop_query = false;
×
275
  WS_RES* res;
276

277
  for (int reconnectNum = 0; reconnectNum < 2; reconnectNum++) {
×
278
    if (!shell.ws_conn && shell_conn_ws_server(0) || shell.stop_query) {
×
279
      return;
×
280
    }
281
    st = taosGetTimestampUs();
×
282

283
    res = ws_query_timeout(shell.ws_conn, command, shell.args.timeout);
×
284
    int code = ws_errno(res);
×
285
    if (code != 0 && !shell.stop_query) {
×
286
      // if it's not a ws connection error
287
      if (TSDB_CODE_WS_DSN_ERROR != (code&TSDB_CODE_WS_DSN_ERROR)) {
×
288
        et = taosGetTimestampUs();
×
289
        fprintf(stderr, "\nDB: error:0x%08X %s (%.6fs)\n",
×
290
                ws_errno(res), ws_errstr(res), (et - st)/1E6);
×
291
        ws_free_result(res);
×
292
        return;
×
293
      }
294
      if (code == TSDB_CODE_WS_SEND_TIMEOUT
×
295
                || code == TSDB_CODE_WS_RECV_TIMEOUT) {
×
296
        fprintf(stderr, "Hint: use -T to increase the timeout in seconds\n");
×
297
      } else if (code == TSDB_CODE_WS_INTERNAL_ERRO
×
298
                    || code == TSDB_CODE_WS_CLOSED) {
×
299
        shell.ws_conn = NULL;
×
300
      }
301
      ws_free_result(res);
×
302
      if (reconnectNum == 0) {
×
303
        continue;
×
304
      } else {
305
        fprintf(stderr, "The server is disconnected, will try to reconnect\n");
×
306
      }
307
      return;
×
308
    }
309
    break;
×
310
  }
311

312
  double execute_time = 0;
×
313
  if (res) {
×
314
    execute_time = ws_take_timing(res)/1E6;
×
315
  }
316

317
  if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$",
×
318
                      REG_EXTENDED | REG_ICASE)) {
319

320
    // copy dbname to curDBName
321
    char *p         = command;
×
322
    bool firstStart = false;
×
323
    bool firstEnd   = false;
×
324
    int  i          = 0;
×
325
    while (*p != 0) {
×
326
      if (*p != ' ') {
×
327
        // not blank
328
        if (!firstStart) {
×
329
          firstStart = true;
×
330
        } else if (firstEnd) {
×
331
          if(*p == ';' && *p != '\\') {
×
332
            break;
×
333
          }
334
          // database name
335
          curDBName[i++] = *p;
×
336
          if(i + 4 > sizeof(curDBName)) {
×
337
            // DBName is too long, reset zero and break
338
            i = 0;
×
339
            break;
×
340
          }
341
        }
342
      } else {
343
        // blank
344
        if(firstStart == true && firstEnd == false){
×
345
          firstEnd = true;
×
346
        }
347
        if(firstStart && firstEnd && i > 0){
×
348
          // blank after database name
349
          break;
×
350
        }
351
      }
352
      // move next
353
      p++;
×
354
    }
355
    // append end
356
    curDBName[i] = 0;
×
357

358
    fprintf(stdout, "Database changed to %s.\r\n\r\n", curDBName);
×
359
    fflush(stdout);
×
360
    ws_free_result(res);
×
361
    return;
×
362
  }
363

364
  int numOfRows = 0;
×
365
  if (ws_is_update_query(res)) {
×
366
    numOfRows = ws_affected_rows(res);
×
367
    et = taosGetTimestampUs();
×
368
    double total_time = (et - st)/1E3;
×
369
    double net_time = total_time - (double)execute_time;
×
370
    printf("Query Ok, %d of %d row(s) in database\n", numOfRows, numOfRows);
×
371
    printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
×
372
           execute_time, net_time, total_time);
373
  } else {
374
    int error_no = 0;
×
375
    numOfRows  = shellDumpWebsocket(res, fname, &error_no,
×
376
                                    printMode, &execute_time);
377
    if (numOfRows < 0) {
×
378
      ws_free_result(res);
×
379
      return;
×
380
    }
381
    et = taosGetTimestampUs();
×
382
    double total_time = (et - st) / 1E3;
×
383
    double net_time = total_time - execute_time;
×
384
    if (error_no == 0 && !shell.stop_query) {
×
385
      printf("Query OK, %d row(s) in set\n", numOfRows);
×
386
      printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n",
×
387
             execute_time, net_time, total_time);
388
    } else {
389
      printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows,
×
390
          (et - st)/1E6);
×
391
    }
392
  }
393
  printf("\n");
×
394
  ws_free_result(res);
×
395
}
396
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc