• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12350839782

16 Dec 2024 10:36AM UTC coverage: 82.0%. First build
12350839782

Pull #14419

github

web-flow
Merge 67b1fa067 into 2d6b729d2
Pull Request #14419: fix: clickhouse split sql re

10 of 12 new or added lines in 1 file covered. (83.33%)

56425 of 68811 relevant lines covered (82.0%)

15102.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4

5
-module(emqx_bridge_clickhouse_connector).
6

7
-include_lib("emqx_connector/include/emqx_connector.hrl").
8
-include_lib("emqx_resource/include/emqx_resource.hrl").
9
-include_lib("typerefl/include/types.hrl").
10
-include_lib("emqx/include/logger.hrl").
11
-include_lib("hocon/include/hoconsc.hrl").
12
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
13

14
-behaviour(emqx_resource).
15

16
-import(hoconsc, [mk/2, enum/1, ref/2]).
17

18
%%=====================================================================
19
%% Exports
20
%%=====================================================================
21

22
%% Hocon config schema exports
23
-export([
24
    roots/0,
25
    fields/1,
26
    values/1,
27
    namespace/0
28
]).
29

30
%% callbacks for behaviour emqx_resource
31
-export([
32
    resource_type/0,
33
    callback_mode/0,
34
    on_start/2,
35
    on_stop/2,
36
    on_add_channel/4,
37
    on_remove_channel/3,
38
    on_get_channel_status/3,
39
    on_get_channels/1,
40
    on_query/3,
41
    on_batch_query/3,
42
    on_get_status/2,
43
    on_format_query_result/1
44
]).
45

46
%% callbacks for ecpool
47
-export([connect/1]).
48

49
%% Internal exports used to execute code with ecpool worker
50
-export([
51
    execute_sql_in_clickhouse_server_using_connection/2
52
]).
53

54
-ifdef(TEST).
55
-export([split_clickhouse_insert_sql/1]).
56
-endif.
57

58
%%=====================================================================
59
%% Types
60
%%=====================================================================
61

62
-type url() :: emqx_http_lib:uri_map().
63
-reflect_type([url/0]).
64
-typerefl_from_string({url/0, emqx_http_lib, uri_parse}).
65

66
-type templates() ::
67
    #{}
68
    | #{
69
        send_message_template := term(),
70
        extend_send_message_template := term()
71
    }.
72

73
-type state() ::
74
    #{
75
        channels => #{binary() => templates()},
76
        templates := templates(),
77
        pool_name := binary(),
78
        connect_timeout := pos_integer()
79
    }.
80

81
-type clickhouse_config() :: map().
82

83
%%=====================================================================
84
%% Macros and On load
85
%%=====================================================================
86

87
%% Copied from emqx_utils_sql:parse_insert/1
88
%% Can also handle Clickhouse's SQL extension for INSERT statments that allows the
89
%% user to specify different formats:
90
%%
91
%% https://clickhouse.com/docs/en/sql-reference/statements/insert-into/
92
%%
93
-define(INSERT_RE_MP_KEY, {?MODULE, insert_re_mp}).
94
-define(INSERT_RE_BIN, <<
95
    %% case-insensitive
96
    "(?i)^",
97
    %% Leading spaces
98
    "\\s*",
99
    %% Group-1: insert into, table name and columns (when existed).
100
    %% All space characters suffixed to <TABLE_NAME> will be kept
101
    %% `INSERT INTO <TABLE_NAME> [(<COLUMN>, ..)]`
102
    "(insert\\s+into\\s+[^\\s\\(\\)]+\\s*(?:(?:\\((?:[^()]++|(?2))*\\)\\s*,?\\s*)*))",
103
    "\\s*",
104
    %% Ignore Group
105
    "(?:",
106
    %% Group-2 (Optional for FORMAT clause):
107
    %% literals value(s) or placeholder(s) with round brackets.
108
    %% And the sub-pattern in brackets does not do any capturing
109
    %% Ignore Group:
110
    %%     `VALUES [([<VALUE> | <PLACEHOLDER>], ...)]`
111
    %% Keep Capturing-Group:
112
    %%     `([<VALUE> | <PLACEHOLDER>], ...) [, ([<VALUE> | <PLACEHOLDER>], ..)]`
113
    "(?:values\\s*(\\((?:[^()]++|(?2))*\\)(?:\\s*,\\s*\\((?:[^()]++|(?2)*)\\))*))",
114
    %% End Group-2
115
    %% or
116
    "|",
117
    %% Group-3:
118
    %% literals value(s) or placeholder(s) as `<FORMAT_DATA>`
119
    %% Ignore Group:
120
    %%     `FORMAT <FORMAT_NAME> <FORMAT_DATA>`
121
    %% Keep Capturing-Group `<FORMAT_DATA>` without any check
122
    %%   Could be:
123
    %%     `([<VALUE> | <PLACEHOLDER>], ...) [, ([<VALUE> | <PLACEHOLDER>], ...)]`
124
    %%     `[([<VALUE> | <PLACEHOLDER>], ...) [, ([<VALUE> | <PLACEHOLDER>], ...)]]`
125
    %%     ...
126
    "(?:format\\s+[a-zA-Z]+\\s+)((?!\\s)(?=.*\\s).*)",
127
    %% End Group-3
128
    ")",
129
    %% End Ignored Group
130
    "\\s*$"
131
>>).
132

133
-on_load(on_load/0).
134

135
on_load() ->
136
    put_insert_mp(),
413✔
137
    ok.
413✔
138

139
put_insert_mp() ->
140
    persistent_term:put(?INSERT_RE_MP_KEY, re:compile(?INSERT_RE_BIN)),
413✔
141
    ok.
413✔
142

143
get_insert_mp() ->
144
    case persistent_term:get(?INSERT_RE_MP_KEY, undefined) of
25✔
145
        undefined ->
NEW
146
            ok = put_insert_mp(),
×
NEW
147
            get_insert_mp();
×
148
        {ok, MP} ->
149
            {ok, MP}
25✔
150
    end.
151

152
%%=====================================================================
153
%% Configuration and default values
154
%%=====================================================================
155

156
namespace() -> clickhouse.
×
157

158
roots() ->
159
    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
4✔
160

161
fields(config) ->
162
    [
163
        {url,
164
            hoconsc:mk(
165
                url(),
166
                #{
167
                    required => true,
168
                    validator => fun
169
                        (#{query := _Query}) ->
170
                            {error, "There must be no query in the url"};
×
171
                        (_) ->
172
                            ok
23✔
173
                    end,
174
                    desc => ?DESC("base_url")
175
                }
176
            )},
177
        {connect_timeout,
178
            hoconsc:mk(
179
                emqx_schema:timeout_duration_ms(),
180
                #{
181
                    default => <<"15s">>,
182
                    desc => ?DESC("connect_timeout")
183
                }
184
            )}
185
    ] ++ emqx_connector_schema_lib:relational_db_fields().
13,104✔
186

187
values(post) ->
188
    maps:merge(values(put), #{name => <<"connector">>});
×
189
values(get) ->
190
    values(post);
×
191
values(put) ->
192
    #{
×
193
        database => <<"mqtt">>,
194
        enable => true,
195
        pool_size => 8,
196
        type => clickhouse,
197
        url => <<"http://127.0.0.1:8123">>
198
    };
199
values(_) ->
200
    #{}.
×
201

202
%% ===================================================================
203
%% Callbacks defined in emqx_resource
204
%% ===================================================================
205
resource_type() -> clickhouse.
9✔
206

207
callback_mode() -> always_sync.
9✔
208

209
%% -------------------------------------------------------------------
210
%% on_start callback and related functions
211
%% -------------------------------------------------------------------
212

213
-spec on_start(resource_id(), clickhouse_config()) -> {ok, state()} | {error, _}.
214

215
on_start(
216
    InstanceID,
217
    #{
218
        url := URL,
219
        database := DB,
220
        pool_size := PoolSize,
221
        connect_timeout := ConnectTimeout
222
    } = Config
223
) ->
224
    ?SLOG(info, #{
10✔
225
        msg => "starting_clickhouse_connector",
226
        connector => InstanceID,
227
        config => emqx_utils:redact(Config)
228
    }),
10✔
229
    Options = [
10✔
230
        {url, URL},
231
        {user, maps:get(username, Config, "default")},
232
        {key, maps:get(password, Config, emqx_secret:wrap("public"))},
233
        {database, DB},
234
        {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
235
        {pool_size, PoolSize},
236
        {pool, InstanceID}
237
    ],
238
    try
10✔
239
        State = #{
10✔
240
            channels => #{},
241
            pool_name => InstanceID,
242
            connect_timeout => ConnectTimeout
243
        },
244
        case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
10✔
245
            ok ->
246
                {ok, State};
10✔
247
            {error, Reason} ->
248
                ?tp(
×
249
                    info,
250
                    "clickhouse_connector_start_failed",
251
                    #{
252
                        error => Reason,
253
                        config => emqx_utils:redact(Config)
254
                    }
255
                ),
256
                {error, Reason}
×
257
        end
258
    catch
259
        _:CatchReason:Stacktrace ->
260
            ?tp(
×
261
                info,
262
                "clickhouse_connector_start_failed",
263
                #{
264
                    error => CatchReason,
265
                    stacktrace => Stacktrace,
266
                    config => emqx_utils:redact(Config)
267
                }
268
            ),
269
            {error, CatchReason}
×
270
    end.
271

272
%% Helper functions to prepare SQL tempaltes
273

274
prepare_sql_templates(#{
275
    sql := Template,
276
    batch_value_separator := Separator
277
}) ->
278
    InsertTemplate = emqx_placeholder:preproc_tmpl(Template),
7✔
279
    BulkExtendInsertTemplate = prepare_sql_bulk_extend_template(Template, Separator),
7✔
280
    #{
7✔
281
        send_message_template => InsertTemplate,
282
        extend_send_message_template => BulkExtendInsertTemplate
283
    };
284
prepare_sql_templates(_) ->
285
    %% We don't create any templates if this is a non-bridge connector
286
    #{}.
×
287

288
prepare_sql_bulk_extend_template(Template, Separator) ->
289
    ValuesTemplate = split_clickhouse_insert_sql(Template),
7✔
290
    %% The value part has been extracted
291
    %% Add separator before ValuesTemplate so that one can append it
292
    %% to an insert template
293
    ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]),
7✔
294
    emqx_placeholder:preproc_tmpl(ExtendParamTemplate).
7✔
295

296
split_clickhouse_insert_sql(SQL) ->
297
    ErrorMsg = <<"The SQL template should be an SQL INSERT statement but it is something else.">>,
25✔
298
    {ok, MP} = get_insert_mp(),
25✔
299
    case re:run(SQL, MP, [{capture, all_but_first, binary}]) of
25✔
300
        {match, [_InsertInto, ValuesTemplate]} ->
301
            ValuesTemplate;
18✔
302
        %% Group2 is empty (not `VALUES` statement)
303
        {match, [_InsertInto, <<>>, FormatTemplate]} ->
304
            FormatTemplate;
5✔
305
        _ ->
306
            erlang:error(ErrorMsg)
2✔
307
    end.
308

309
% This is a callback for ecpool which is triggered by the call to
310
% emqx_resource_pool:start in on_start/2
311
connect(Options) ->
312
    URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))),
80✔
313
    User = proplists:get_value(user, Options),
80✔
314
    Database = proplists:get_value(database, Options),
80✔
315
    %% TODO: teach `clickhouse` to accept 0-arity closures as passwords.
316
    Key = emqx_secret:unwrap(proplists:get_value(key, Options)),
80✔
317
    Pool = proplists:get_value(pool, Options),
80✔
318
    PoolSize = proplists:get_value(pool_size, Options),
80✔
319
    FixedOptions = [
80✔
320
        {url, URL},
321
        {database, Database},
322
        {user, User},
323
        {key, Key},
324
        {pool, Pool},
325
        {pool_size, PoolSize}
326
    ],
327
    case clickhouse:start_link(FixedOptions) of
80✔
328
        {ok, Connection} ->
329
            %% Check if we can connect and send a query
330
            case clickhouse:detailed_status(Connection) of
13,104✔
331
                ok ->
332
                    {ok, Connection};
333
                Error ->
334
                    ok = clickhouse:stop(Connection),
335
                    Error
336
            end;
337
        {error, Reason} ->
338
            {error, Reason}
339
    end.
340

341
%% -------------------------------------------------------------------
342
%% on_stop emqx_resouce callback
343
%% -------------------------------------------------------------------
344

345
-spec on_stop(resource_id(), resource_state()) -> term().
346

347
on_stop(InstanceID, _State) ->
348
    ?SLOG(info, #{
349
        msg => "stopping clickouse connector",
350
        connector => InstanceID
351
    }),
352
    emqx_resource_pool:stop(InstanceID).
353

354
%% -------------------------------------------------------------------
355
%% channel related emqx_resouce callbacks
356
%% -------------------------------------------------------------------
357
on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) ->
358
    #{parameters := ChannelConf} = ChannConf0,
359
    NewChanns = Channs#{
360
        ChannId => #{templates => prepare_sql_templates(ChannelConf), channel_conf => ChannelConf}
361
    },
362
    {ok, OldState#{channels => NewChanns}}.
363

364
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) ->
365
    NewState = State#{channels => maps:remove(ChannId, Channels)},
366
    {ok, NewState}.
367

368
on_get_channel_status(InstanceId, _ChannId, State) ->
369
    case on_get_status(InstanceId, State) of
370
        ?status_connected -> ?status_connected;
371
        {?status_disconnected, _} -> ?status_disconnected
372
    end.
373

374
on_get_channels(InstanceId) ->
375
    emqx_bridge_v2:get_channels_for_connector(InstanceId).
376

377
%% -------------------------------------------------------------------
378
%% on_get_status emqx_resouce callback and related functions
379
%% -------------------------------------------------------------------
380

381
on_get_status(
382
    _InstanceID,
383
    #{pool_name := PoolName, connect_timeout := Timeout}
384
) ->
385
    case do_get_status(PoolName, Timeout) of
386
        ok ->
387
            ?status_connected;
388
        {error, Reason} ->
389
            {?status_disconnected, Reason}
390
    end.
391

392
do_get_status(PoolName, Timeout) ->
393
    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
394
    DoPerWorker =
395
        fun(Worker) ->
396
            case ecpool_worker:exec(Worker, fun clickhouse:detailed_status/1, Timeout) of
397
                ok ->
398
                    ok;
399
                {error, Reason} = Error ->
400
                    ?SLOG(error, #{
401
                        msg => "clickhouse_connector_get_status_failed",
402
                        reason => Reason,
403
                        worker => Worker
404
                    }),
405
                    Error
406
            end
407
        end,
408
    try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
409
        Results ->
410
            case [E || {error, _} = E <- Results] of
411
                [] ->
412
                    ok;
413
                Errors ->
414
                    hd(Errors)
415
            end
416
    catch
417
        exit:timeout ->
418
            ?SLOG(error, #{
419
                msg => "clickhouse_connector_pmap_failed",
420
                reason => timeout
421
            }),
422
            {error, timeout}
423
    end.
424

425
%% -------------------------------------------------------------------
426
%% on_query emqx_resouce callback and related functions
427
%% -------------------------------------------------------------------
428

429
-spec on_query
430
    (resource_id(), Request, resource_state()) -> query_result() when
431
        Request :: {ChannId, Data},
432
        ChannId :: binary(),
433
        Data :: map();
434
    (resource_id(), Request, resource_state()) -> query_result() when
435
        Request :: {RequestType, SQL},
436
        RequestType :: sql | query,
437
        SQL :: binary().
438

439
on_query(
440
    ResourceID,
441
    {RequestType, DataOrSQL},
442
    #{pool_name := PoolName} = State
443
) ->
444
    ?SLOG(debug, #{
445
        msg => "clickhouse_connector_received_sql_query",
446
        connector => ResourceID,
447
        type => RequestType,
448
        sql => DataOrSQL,
449
        state => State
450
    }),
451
    %% Have we got a query or data to fit into an SQL template?
452
    SimplifiedRequestType = query_type(RequestType),
453
    ChannelState = get_channel_state(RequestType, State),
454
    Templates = get_templates(RequestType, State),
455
    SQL = get_sql(
456
        SimplifiedRequestType, Templates, DataOrSQL, maps:get(channel_conf, ChannelState, #{})
457
    ),
458
    ClickhouseResult = execute_sql_in_clickhouse_server(RequestType, PoolName, SQL),
459
    transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
460

461
get_templates(ChannId, State) ->
462
    maps:get(templates, get_channel_state(ChannId, State), #{}).
463

464
get_channel_state(ChannId, State) ->
465
    case maps:find(channels, State) of
466
        {ok, Channels} ->
467
            maps:get(ChannId, Channels, #{});
468
        error ->
469
            #{}
470
    end.
471

472
get_sql(channel_message, #{send_message_template := PreparedSQL}, Data, ChannelConf) ->
473
    proc_nullable_tmpl(PreparedSQL, Data, ChannelConf);
474
get_sql(_, _, SQL, _) ->
475
    SQL.
476

477
query_type(sql) ->
478
    query;
479
query_type(query) ->
480
    query;
481
%% Data that goes to bridges use the prepared template
482
query_type(ChannId) when is_binary(ChannId) ->
483
    channel_message.
484

485
%% -------------------------------------------------------------------
486
%% on_batch_query emqx_resouce callback and related functions
487
%% -------------------------------------------------------------------
488

489
-spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when
490
    BatchReq :: nonempty_list({binary(), map()}).
491

492
on_batch_query(ResourceID, BatchReq, #{pool_name := PoolName} = State) ->
493
    %% Currently we only support batch requests with a binary ChannId
494
    {[ChannId | _] = Keys, ObjectsToInsert} = lists:unzip(BatchReq),
495
    ensure_channel_messages(Keys),
496
    Templates = get_templates(ChannId, State),
497
    ChannelState = get_channel_state(ChannId, State),
498
    %% Create batch insert SQL statement
499
    SQL = objects_to_sql(ObjectsToInsert, Templates, maps:get(channel_conf, ChannelState, #{})),
500
    %% Do the actual query in the database
501
    ResultFromClickhouse = execute_sql_in_clickhouse_server(ChannId, PoolName, SQL),
502
    %% Transform the result to a better format
503
    transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL).
504

505
ensure_channel_messages(Keys) ->
506
    case lists:all(fun is_binary/1, Keys) of
507
        true ->
508
            ok;
509
        false ->
510
            erlang:error(
511
                {unrecoverable_error, <<"Unexpected type for batch message (Expected channel-id)">>}
512
            )
513
    end.
514

515
objects_to_sql(
516
    [FirstObject | RemainingObjects] = _ObjectsToInsert,
517
    #{
518
        send_message_template := InsertTemplate,
519
        extend_send_message_template := BulkExtendInsertTemplate
520
    },
521
    ChannelConf
522
) ->
523
    %% Prepare INSERT-statement and the first row after VALUES
524
    InsertStatementHead = proc_nullable_tmpl(InsertTemplate, FirstObject, ChannelConf),
525
    FormatObjectDataFunction =
526
        fun(Object) ->
527
            proc_nullable_tmpl(BulkExtendInsertTemplate, Object, ChannelConf)
528
        end,
529
    InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects),
530
    CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]),
531
    CompleteStatement;
532
objects_to_sql(_, _, _) ->
533
    erlang:error(<<"Templates for bulk insert missing.">>).
534

535
proc_nullable_tmpl(Template, Data, #{undefined_vars_as_null := true}) ->
536
    emqx_placeholder:proc_nullable_tmpl(Template, Data);
537
proc_nullable_tmpl(Template, Data, _) ->
538
    emqx_placeholder:proc_tmpl(Template, Data).
539

540
%% -------------------------------------------------------------------
541
%% Helper functions that are used by both on_query/3 and on_batch_query/3
542
%% -------------------------------------------------------------------
543

544
%% This function is used by on_query/3 and on_batch_query/3 to send a query to
545
%% the database server and receive a result
546
execute_sql_in_clickhouse_server(Id, PoolName, SQL) ->
547
    emqx_trace:rendered_action_template(Id, #{rendered_sql => SQL}),
548
    ecpool:pick_and_do(
549
        PoolName,
550
        {?MODULE, execute_sql_in_clickhouse_server_using_connection, [SQL]},
551
        no_handover
552
    ).
553

554
execute_sql_in_clickhouse_server_using_connection(Connection, SQL) ->
555
    clickhouse:query(Connection, SQL, []).
556

557
%% This function transforms the result received from clickhouse to something
558
%% that is a little bit more readable and creates approprieate log messages
559
transform_and_log_clickhouse_result({ok, ResponseCode, <<"">>} = _ClickhouseResult, _, _) when
560
    ResponseCode =:= 200; ResponseCode =:= 204
561
->
562
    snabbkaffe_log_return(ok),
563
    ok;
564
transform_and_log_clickhouse_result({ok, ResponseCode, Data}, _, _) when
565
    ResponseCode =:= 200; ResponseCode =:= 204
566
->
567
    Result = {ok, Data},
568
    snabbkaffe_log_return(Result),
569
    Result;
570
transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
571
    ?SLOG(error, #{
572
        msg => "clickhouse_connector_do_sql_query_failed",
573
        connector => ResourceID,
574
        sql => SQL,
575
        reason => ClickhouseErrorResult
576
    }),
577
    case is_recoverable_error(ClickhouseErrorResult) of
578
        %% TODO: The hackney errors that the clickhouse library forwards are
579
        %% very loosely defined. We should try to make sure that the following
580
        %% handles all error cases that we need to handle as recoverable_error
581
        true ->
582
            ?SLOG(warning, #{
583
                msg => "clickhouse_connector_sql_query_failed_recoverable",
584
                recoverable_error => true,
585
                connector => ResourceID,
586
                sql => SQL,
587
                reason => ClickhouseErrorResult
588
            }),
589
            to_recoverable_error(ClickhouseErrorResult);
590
        false ->
591
            ?SLOG(error, #{
592
                msg => "clickhouse_connector_sql_query_failed_unrecoverable",
593
                recoverable_error => false,
594
                connector => ResourceID,
595
                sql => SQL,
596
                reason => ClickhouseErrorResult
597
            }),
598
            to_error_tuple(ClickhouseErrorResult)
599
    end.
600

601
on_format_query_result(ok) ->
602
    #{result => ok, message => <<"">>};
603
on_format_query_result({ok, Message}) ->
604
    #{result => ok, message => Message};
605
on_format_query_result(Result) ->
606
    Result.
607

608
to_recoverable_error({error, Reason}) ->
609
    {error, {recoverable_error, Reason}};
610
to_recoverable_error(Error) ->
611
    {error, {recoverable_error, Error}}.
612

613
to_error_tuple({error, Reason}) ->
614
    {error, {unrecoverable_error, Reason}};
615
to_error_tuple(Error) ->
616
    {error, {unrecoverable_error, Error}}.
617

618
is_recoverable_error({error, Reason}) ->
619
    is_recoverable_error_reason(Reason);
620
is_recoverable_error(_) ->
621
    false.
622

623
is_recoverable_error_reason(ecpool_empty) ->
624
    true;
625
is_recoverable_error_reason(econnrefused) ->
626
    true;
627
is_recoverable_error_reason(closed) ->
628
    true;
629
is_recoverable_error_reason({closed, _PartialBody}) ->
630
    true;
631
is_recoverable_error_reason(disconnected) ->
632
    true;
633
is_recoverable_error_reason(_) ->
634
    false.
635

636
snabbkaffe_log_return(_Result) ->
637
    ?tp(
638
        clickhouse_connector_query_return,
639
        #{result => _Result}
640
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc