• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

plausible / ch / f42ac817c0d38bcfad62a32bcbe116f0e6b0f3ff-PR-328

23 Apr 2026 10:24PM UTC coverage: 92.111% (+0.8%) from 91.309%
f42ac817c0d38bcfad62a32bcbe116f0e6b0f3ff-PR-328

Pull #328

github

ruslandoga
Revert "RowBinary: de- and encode dynamic JSON (#296)"

This reverts commit d624ecd72.
Pull Request #328: Revert dynamic JSON RowBinary support

899 of 976 relevant lines covered (92.11%)

82491.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.88
/lib/ch/row_binary.ex
1
defmodule Ch.RowBinary do
2
  @moduledoc "Helpers for working with ClickHouse [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) format."
3

4
  # @compile {:bin_opt_info, true}
5
  @dialyzer :no_improper_lists
6

7
  import Bitwise
8

9
  @epoch_gregorian_seconds 62_167_219_200
10
  @epoch_gregorian_days 719_528
11

12
  @doc false
13
  def encode_names_and_types(names, types) do
6✔
14
    [encode(:varint, length(names)), encode_many(names, :string), encode_types(types)]
15
  end
16

17
  defp encode_types([type | types]) do
18
    encoded =
18✔
19
      case type do
20
        _ when is_binary(type) -> type
×
21
        _ -> Ch.Types.encode(type)
18✔
22
      end
23

24
    [encode(:string, encoded) | encode_types(types)]
25
  end
26

27
  defp encode_types([] = done), do: done
6✔
28

29
  @doc """
30
  Encodes a single row to [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) as iodata.
31

32
  Examples:
33

34
      iex> encode_row([], [])
35
      []
36

37
      iex> encode_row([1], ["UInt8"])
38
      [1]
39

40
      iex> encode_row([3, "hello"], ["UInt8", "String"])
41
      [3, [5 | "hello"]]
42

43
  """
44
  def encode_row(row, types) do
45
    _encode_row(row, encoding_types(types))
26✔
46
  end
47

48
  defp _encode_row([el | els], [type | types]), do: [encode(type, el) | _encode_row(els, types)]
114✔
49
  defp _encode_row([] = done, []), do: done
26✔
50

51
  @doc """
52
  Encodes multiple rows to [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) as iodata.
53

54
  Examples:
55

56
      iex> encode_rows([], [])
57
      []
58

59
      iex> encode_rows([[1]], ["UInt8"])
60
      [1]
61

62
      iex> encode_rows([[3, "hello"], [4, "hi"]], ["UInt8", "String"])
63
      [3, [5 | "hello"], 4, [2 | "hi"]]
64

65
  """
66
  def encode_rows(rows, types) do
67
    _encode_rows(rows, encoding_types(types))
123✔
68
  end
69

70
  @doc false
71
  def _encode_rows([row | rows], types), do: _encode_rows(row, types, rows, types)
2,000,212✔
72
  def _encode_rows([] = done, _types), do: done
123✔
73

74
  defp _encode_rows([el | els], [t | ts], rows, types) do
2,000,419✔
75
    [encode(t, el) | _encode_rows(els, ts, rows, types)]
76
  end
77

78
  defp _encode_rows([], [], rows, types), do: _encode_rows(rows, types)
2,000,212✔
79

80
  @doc false
81
  def encoding_types([type | types]) do
344✔
82
    [encoding_type(type) | encoding_types(types)]
83
  end
84

85
  def encoding_types([] = done), do: done
149✔
86

87
  defp encoding_type(type) when is_binary(type) do
88
    encoding_type(Ch.Types.decode(type))
213✔
89
  end
90

91
  defp encoding_type(t)
92
       when t in [
93
              :string,
94
              :binary,
95
              :json,
96
              :dynamic,
97
              :boolean,
98
              :uuid,
99
              :date,
100
              :datetime,
101
              :date32,
102
              :time,
103
              :ipv4,
104
              :ipv6,
105
              :point,
106
              :nothing
107
            ],
108
       do: t
149✔
109

110
  defp encoding_type({:datetime = d, "UTC"}), do: d
×
111

112
  defp encoding_type({:datetime, tz}) do
113
    raise ArgumentError, "can't encode DateTime with non-UTC timezone: #{inspect(tz)}"
×
114
  end
115

116
  defp encoding_type({:fixed_string, _len} = t), do: t
13✔
117

118
  for size <- [8, 16, 32, 64, 128, 256] do
119
    defp encoding_type(unquote(:"u#{size}") = u), do: u
127✔
120
    defp encoding_type(unquote(:"i#{size}") = i), do: i
42✔
121
  end
122

123
  for size <- [32, 64] do
124
    defp encoding_type(unquote(:"f#{size}") = f), do: f
6✔
125
  end
126

127
  defp encoding_type({:array = a, t}), do: {a, encoding_type(t)}
25✔
128

129
  defp encoding_type({:tuple = t, ts}) do
4✔
130
    {t, Enum.map(ts, &encoding_type/1)}
131
  end
132

133
  defp encoding_type({:variant = v, ts}) do
4✔
134
    {v, Enum.map(ts, &encoding_type/1)}
135
  end
136

137
  defp encoding_type({:map = m, kt, vt}) do
138
    {m, encoding_type(kt), encoding_type(vt)}
9✔
139
  end
140

141
  defp encoding_type({:nullable = n, t}), do: {n, encoding_type(t)}
16✔
142
  defp encoding_type({:low_cardinality, t}), do: encoding_type(t)
4✔
143

144
  defp encoding_type({:decimal, p, s}) do
145
    case decimal_size(p) do
×
146
      32 -> {:decimal32, s}
×
147
      64 -> {:decimal64, s}
×
148
      128 -> {:decimal128, s}
×
149
      256 -> {:decimal256, s}
×
150
    end
151
  end
152

153
  defp encoding_type({d, _scale} = t)
154
       when d in [:decimal32, :decimal64, :decimal128, :decimal256],
155
       do: t
6✔
156

157
  defp encoding_type({:datetime64 = t, p}), do: {t, time_unit(p)}
2✔
158

159
  defp encoding_type({:datetime64 = t, p, "UTC"}), do: {t, time_unit(p)}
×
160

161
  defp encoding_type({:datetime64, _, tz}) do
162
    raise ArgumentError, "can't encode DateTime64 with non-UTC timezone: #{inspect(tz)}"
×
163
  end
164

165
  defp encoding_type({:time64 = t, p}), do: {t, time_unit(p)}
8✔
166

167
  defp encoding_type({e, mappings}) when e in [:enum8, :enum16] do
4✔
168
    {e, Map.new(mappings)}
169
  end
170

171
  defp encoding_type({:simple_aggregate_function, _f, t}), do: encoding_type(t)
×
172

173
  defp encoding_type(:ring), do: {:array, :point}
2✔
174
  defp encoding_type(:polygon), do: {:array, {:array, :point}}
2✔
175
  defp encoding_type(:multipolygon), do: {:array, {:array, {:array, :point}}}
2✔
176
  defp encoding_type({:json, opts}), do: {:json, opts}
177

178
  defp encoding_type(type) do
×
179
    raise ArgumentError, "unsupported type for encoding: #{inspect(type)}"
180
  end
181

182
  @doc false
183
  def encode(type, value)
184

383✔
185
  def encode(:varint, i) when is_integer(i) and i < 128, do: i
8✔
186
  def encode(:varint, i) when is_integer(i), do: encode_varint_cont(i)
187

188
  def encode(type, str) when type in [:string, :binary] do
325✔
189
    case str do
275✔
190
      _ when is_binary(str) -> [encode(:varint, byte_size(str)) | str]
46✔
191
      _ when is_list(str) -> [encode(:varint, IO.iodata_length(str)) | str]
1✔
192
      nil -> 0
193
    end
194
  end
195

196
  def encode(:json, json) do
197
    # assuming it can be sent as text and not "native" binary JSON
198
    # i.e. assumes `settings: [input_format_binary_read_json_as_string: 1]`
199
    # TODO
6✔
200
    encode(:string, Jason.encode_to_iodata!(json))
201
  end
202

203
  def encode({:json, _opts}, json) do
15✔
204
    # assuming it can be sent as text and not "native" binary JSON
205
    # i.e. assumes `settings: [input_format_binary_read_json_as_string: 1]`
206
    # TODO
207
    encode(:string, Jason.encode_to_iodata!(json))
6✔
208
  end
209

210
  def encode({:fixed_string, size}, str) when byte_size(str) == size do
211
    str
1✔
212
  end
213

214
  def encode({:fixed_string, size}, str) when byte_size(str) < size do
111✔
215
    to_pad = size - byte_size(str)
5✔
216
    [str | <<0::size(to_pad * 8)>>]
217
  end
218

3✔
219
  def encode({:fixed_string, size}, nil), do: <<0::size(size * 8)>>
220

221
  # UInt8 — [0 : 255]
222
  def encode(:u8, u) when is_integer(u) and u >= 0 and u <= 255, do: u
15✔
223
  def encode(:u8, nil), do: 0
3✔
224

1✔
225
  def encode(:u8, term) do
226
    raise ArgumentError, "invalid UInt8: #{inspect(term)}"
227
  end
3✔
228

229
  # Int8 — [-128 : 127]
230
  def encode(:i8, i) when is_integer(i) and i >= 0 and i <= 127, do: i
231
  def encode(:i8, i) when is_integer(i) and i < 0 and i >= -128, do: <<i::signed>>
232
  def encode(:i8, nil), do: 0
2,000,066✔
233

234
  def encode(:i8, term) do
235
    raise ArgumentError, "invalid Int8: #{inspect(term)}"
236
  end
56✔
237

238
  for size <- [16, 32, 64, 128, 256] do
239
    def encode(unquote(:"u#{size}"), u) when is_integer(u) do
3✔
240
      <<u::unquote(size)-little>>
3✔
241
    end
242

243
    def encode(unquote(:"i#{size}"), i) when is_integer(i) do
244
      <<i::unquote(size)-little-signed>>
245
    end
246

247
    def encode(unquote(:"u#{size}"), nil), do: <<0::unquote(size)>>
71✔
248
    def encode(unquote(:"i#{size}"), nil), do: <<0::unquote(size)>>
249
  end
250

2✔
251
  for size <- [32, 64] do
252
    type = :"f#{size}"
253

254
    def encode(unquote(type), f) when is_number(f) do
×
255
      <<f::unquote(size)-little-signed-float>>
256
    end
×
257

×
258
    def encode(unquote(type), nil), do: <<0::unquote(size)>>
×
259
  end
×
260

261
  def encode({:decimal, precision, scale}, decimal) do
262
    type =
×
263
      case decimal_size(precision) do
264
        32 -> :decimal32
265
        64 -> :decimal64
266
        128 -> :decimal128
267
        256 -> :decimal256
268
      end
269

24✔
270
    encode({type, scale}, decimal)
271
  end
17✔
272

17✔
273
  for size <- [32, 64, 128, 256] do
274
    type = :"decimal#{size}"
7✔
275

1✔
276
    def encode({unquote(type), scale} = t, %Decimal{sign: sign, coef: coef, exp: exp} = d) do
1✔
277
      cond do
278
        scale == -exp ->
6✔
279
          i = sign * coef
6✔
280
          <<i::unquote(size)-little>>
281

282
        exp >= 0 ->
283
          i = sign * coef * round(:math.pow(10, exp + scale))
4✔
284
          <<i::unquote(size)-little>>
285

286
        true ->
4✔
287
          encode(t, Decimal.round(d, scale))
4✔
288
      end
1✔
289
    end
290

49✔
291
    def encode({unquote(type), _scale}, nil), do: <<0::unquote(size)>>
292
  end
293

294
  def encode(:boolean, true), do: 1
13✔
295
  def encode(:boolean, false), do: 0
4✔
296
  def encode(:boolean, nil), do: 0
297

14✔
298
  def encode({:array, type}, [_ | _] = l) do
299
    [encode(:varint, length(l)) | encode_many(l, type)]
300
  end
301

16✔
302
  def encode({:array, _type}, []), do: 0
8✔
303
  def encode({:array, _type}, nil), do: 0
3✔
304

305
  def encode({:map, k, v}, [_ | _] = m) do
306
    [encode(:varint, length(m)) | encode_many_kv(m, k, v)]
8✔
307
  end
308

309
  def encode({:map, _k, _v} = t, m) when is_map(m), do: encode(t, Map.to_list(m))
310
  def encode({:map, _k, _v}, []), do: 0
8✔
311
  def encode({:map, _k, _v}, nil), do: 0
312

313
  def encode({:tuple, _types} = t, v) when is_tuple(v) do
314
    encode(t, Tuple.to_list(v))
×
315
  end
316

317
  def encode({:tuple, types}, values) when is_list(types) and is_list(values) do
4✔
318
    encode_row(values, types)
319
  end
320

10✔
321
  def encode({:tuple, types}, nil) when is_list(types) do
322
    Enum.map(types, fn type -> encode(type, nil) end)
323
  end
324

18✔
325
  def encode({:variant, _types}, nil), do: 255
18✔
326

327
  def encode({:variant, types}, value) do
328
    try_encode_variant(types, 0, value)
329
  end
10✔
330

331
  def encode(:datetime, %NaiveDateTime{} = datetime) do
332
    {seconds, _micros} = NaiveDateTime.to_gregorian_seconds(datetime)
1✔
333
    <<seconds - @epoch_gregorian_seconds::32-little>>
334
  end
335

8✔
336
  def encode(:datetime, %DateTime{} = datetime) do
337
    <<DateTime.to_unix(datetime, :second)::32-little>>
8✔
338
  end
339

340
  def encode(:datetime, nil), do: <<0::32>>
341

4✔
342
  def encode({:datetime64, time_unit}, %NaiveDateTime{} = datetime) do
343
    {seconds, micros} = NaiveDateTime.to_gregorian_seconds(datetime)
344

1✔
345
    <<(seconds - @epoch_gregorian_seconds) * time_unit + div(micros * time_unit, 1_000_000)::64-little-signed>>
346
  end
347

13✔
348
  def encode({:datetime64, time_unit}, %DateTime{} = datetime) do
349
    <<DateTime.to_unix(datetime, time_unit)::64-little-signed>>
350
  end
1✔
351

352
  def encode({:datetime64, _time_unit}, nil), do: <<0::64>>
353

8✔
354
  def encode(:date, %Date{} = date) do
355
    <<Date.to_gregorian_days(date) - @epoch_gregorian_days::16-little>>
356
  end
1✔
357

358
  def encode(:date, nil), do: <<0::16>>
359

10✔
360
  def encode(:date32, %Date{} = date) do
10✔
361
    <<Date.to_gregorian_days(date) - @epoch_gregorian_days::32-little-signed>>
362
  end
363

×
364
  def encode(:date32, nil), do: <<0::32>>
365

366
  def encode(:time, %Time{} = time) do
34✔
367
    {s, _micros} = Time.to_seconds_after_midnight(time)
368
    <<s::32-little-signed>>
34✔
369
  end
370

12✔
371
  def encode(:time, nil), do: <<0::32>>
22✔
372

10✔
373
  def encode({:time64, time_unit}, %Time{} = time) do
374
    {s, micros} = Time.to_seconds_after_midnight(time)
375

34✔
376
    micros_as_ticks =
34✔
377
      cond do
378
        time_unit < 1_000_000 -> div(micros, time_unit)
379
        time_unit == 1_000_000 -> micros
×
380
        true -> micros * div(time_unit, 1_000_000)
381
      end
9✔
382

383
    ticks = s * time_unit + micros_as_ticks
384
    <<ticks::64-little-signed>>
385
  end
386

387
  def encode({:time64, _time_unit}, nil), do: <<0::64>>
388

1✔
389
  def encode(:uuid, <<u1::64, u2::64>>), do: <<u1::64-little, u2::64-little>>
390

391
  def encode(
392
        :uuid,
393
        <<a1, a2, a3, a4, a5, a6, a7, a8, ?-, b1, b2, b3, b4, ?-, c1, c2, c3, c4, ?-, d1, d2, d3,
394
          d4, ?-, e1, e2, e3, e4, e5, e6, e7, e8, e9, e10, e11, e12>>
1✔
395
      ) do
396
    raw =
397
      <<d(a1)::4, d(a2)::4, d(a3)::4, d(a4)::4, d(a5)::4, d(a6)::4, d(a7)::4, d(a8)::4, d(b1)::4,
1✔
398
        d(b2)::4, d(b3)::4, d(b4)::4, d(c1)::4, d(c2)::4, d(c3)::4, d(c4)::4, d(d1)::4, d(d2)::4,
399
        d(d3)::4, d(d4)::4, d(e1)::4, d(e2)::4, d(e3)::4, d(e4)::4, d(e5)::4, d(e6)::4, d(e7)::4,
4✔
400
        d(e8)::4, d(e9)::4, d(e10)::4, d(e11)::4, d(e12)::4>>
×
401

402
    encode(:uuid, raw)
403
  end
4✔
404

405
  def encode(:uuid, nil), do: <<0::128>>
406

×
407
  def encode(:ipv4, {a, b, c, d}), do: [d, c, b, a]
×
408
  def encode(:ipv4, nil), do: <<0::32>>
409

28✔
410
  def encode(:ipv6, {b1, b2, b3, b4, b5, b6, b7, b8}) do
1✔
411
    <<b1::16, b2::16, b3::16, b4::16, b5::16, b6::16, b7::16, b8::16>>
1✔
412
  end
1✔
413

1✔
414
  def encode(:ipv6, <<_::128>> = encoded), do: encoded
415
  def encode(:ipv6, nil), do: <<0::128>>
416

417
  def encode(:point, {x, y}), do: [encode(:f64, x) | encode(:f64, y)]
18✔
418
  def encode(:point, nil), do: <<0::128>>
3✔
419
  def encode(:ring, points), do: encode({:array, :point}, points)
5✔
420
  def encode(:polygon, rings), do: encode({:array, :ring}, rings)
2✔
421
  def encode(:multipolygon, polygons), do: encode({:array, :polygon}, polygons)
3✔
422

3✔
423
  # TODO
2✔
424
  def encode(:dynamic, value) do
×
425
    case value do
426
      _ when is_binary(value) -> [0x15 | encode(:string, value)]
427
      _ when is_integer(value) and value >= 0 -> [0x04 | encode(:u64, value)]
428
      _ when is_integer(value) -> [0x0A | encode(:i64, value)]
429
      _ when is_float(value) -> [0x0E | encode(:f64, value)]
430
      %Date{} -> [0x0F | encode(:date, value)]
431
      %NaiveDateTime{} -> [0x11 | encode(:datetime, value)]
432
      [] -> [0x1E, 0x00]
433
    end
434
  end
12✔
435

436
  # TODO enum8 and enum16 nil
437
  for size <- [8, 16] do
4✔
438
    enum_t = :"enum#{size}"
439
    int_t = :"i#{size}"
440

8✔
441
    def encode({unquote(enum_t), mapping}, e) do
442
      i =
8✔
443
        case e do
444
          _ when is_integer(e) ->
445
            e
×
446

447
          _ when is_binary(e) ->
448
            case Map.fetch(mapping, e) do
449
              {:ok, res} ->
450
                res
12✔
451

452
              :error ->
453
                raise ArgumentError,
454
                      "enum value #{inspect(e)} not found in mapping: #{inspect(mapping)}"
12✔
455
            end
456
        end
457

11✔
458
      encode(unquote(int_t), i)
11✔
459
    end
×
460
  end
461

462
  def encode({:nullable, _type}, nil), do: 1
463

8✔
464
  def encode({:nullable, type}, value) do
465
    case encode(type, value) do
12✔
466
      e when is_list(e) or is_binary(e) -> [0 | e]
467
      e -> [0, e]
468
    end
469
  end
127✔
470

55✔
471
  defp encode_varint_cont(i) when i < 128, do: <<i>>
472

19✔
473
  defp encode_varint_cont(i) do
474
    [(i &&& 0b0111_1111) ||| 0b1000_0000 | encode_varint_cont(i >>> 7)]
475
  end
476

477
  defp encode_many([el | rest], type), do: [encode(type, el) | encode_many(rest, type)]
478
  defp encode_many([] = done, _type), do: done
479

480
  defp encode_many_kv([{key, value} | rest], key_type, value_type) do
14✔
481
    [
482
      encode(key_type, key),
483
      encode(value_type, value)
484
      | encode_many_kv(rest, key_type, value_type)
18✔
485
    ]
18✔
486
  end
487

10✔
488
  defp encode_many_kv([] = done, _key_type, _value_type), do: done
489

8✔
490
  # TODO find a better way than try/rescue
491
  defp try_encode_variant([type | types], idx, value) do
492
    try do
493
      encode(type, value)
494
    else
×
495
      encoded -> [idx | encoded]
496
    rescue
497
      _e -> try_encode_variant(types, idx + 1, value)
498
    end
499
  end
1✔
500

×
501
  defp try_encode_variant([], _idx, value) do
1✔
502
    raise ArgumentError, "no matching type found for encoding #{inspect(value)} as Variant"
1✔
503
  end
×
504

1✔
505
  @compile {:inline, d: 1}
1✔
506

×
507
  defp d(?0), do: 0
1✔
508
  defp d(?1), do: 1
1✔
509
  defp d(?2), do: 2
×
510
  defp d(?3), do: 3
×
511
  defp d(?4), do: 4
×
512
  defp d(?5), do: 5
×
513
  defp d(?6), do: 6
×
514
  defp d(?7), do: 7
×
515
  defp d(?8), do: 8
1✔
516
  defp d(?9), do: 9
1✔
517
  defp d(?A), do: 10
1✔
518
  defp d(?B), do: 11
1✔
519
  defp d(?C), do: 12
1✔
520
  defp d(?D), do: 13
1✔
521
  defp d(?E), do: 14
522
  defp d(?F), do: 15
523
  defp d(?a), do: 10
524
  defp d(?b), do: 11
525
  defp d(?c), do: 12
526
  defp d(?d), do: 13
527
  defp d(?e), do: 14
528
  defp d(?f), do: 15
529

530
  varints = [
531
    {_pattern = quote(do: <<0::1, v1::7>>), _value = quote(do: v1)},
532
    {quote(do: <<1::1, v1::7, 0::1, v2::7>>), quote(do: (v2 <<< 7) + v1)},
533
    {quote(do: <<1::1, v1::7, 1::1, v2::7, 0::1, v3::7>>),
534
     quote(do: (v3 <<< 14) + (v2 <<< 7) + v1)},
535
    {quote(do: <<1::1, v1::7, 1::1, v2::7, 1::1, v3::7, 0::1, v4::7>>),
536
     quote(do: (v4 <<< 21) + (v3 <<< 14) + (v2 <<< 7) + v1)},
537
    {quote(do: <<1::1, v1::7, 1::1, v2::7, 1::1, v3::7, 1::1, v4::7, 0::1, v5::7>>),
538
     quote(do: (v5 <<< 28) + (v4 <<< 21) + (v3 <<< 14) + (v2 <<< 7) + v1)},
539
    {quote(do: <<1::1, v1::7, 1::1, v2::7, 1::1, v3::7, 1::1, v4::7, 1::1, v5::7, 0::1, v6::7>>),
540
     quote(do: (v6 <<< 35) + (v5 <<< 28) + (v4 <<< 21) + (v3 <<< 14) + (v2 <<< 7) + v1)},
541
    {quote do
542
       <<1::1, v1::7, 1::1, v2::7, 1::1, v3::7, 1::1, v4::7, 1::1, v5::7, 1::1, v6::7, 0::1,
543
         v7::7>>
544
     end,
545
     quote do
546
       (v7 <<< 42) + (v6 <<< 35) + (v5 <<< 28) + (v4 <<< 21) + (v3 <<< 14) + (v2 <<< 7) + v1
547
     end},
548
    {quote do
549
       <<1::1, v1::7, 1::1, v2::7, 1::1, v3::7, 1::1, v4::7, 1::1, v5::7, 1::1, v6::7, 1::1,
550
         v7::7, 0::1, v8::7>>
551
     end,
552
     quote do
553
       (v8 <<< 49) + (v7 <<< 42) + (v6 <<< 35) + (v5 <<< 28) + (v4 <<< 21) + (v3 <<< 14) +
554
         (v2 <<< 7) + v1
555
     end}
556
  ]
557

42✔
558
  @doc false
559
  @spec decode_header(binary()) ::
560
          {:ok, names :: [String.t()], types :: [term], rest :: binary} | :more
561
  def decode_header(row_binary_with_names_and_types)
1✔
562

563
  for {pattern, value} <- varints do
564
    def decode_header(<<unquote(pattern), rest::bytes>>) do
565
      decode_header_names(rest, unquote(value), unquote(value), _acc = [])
566
    end
27✔
567
  end
568

569
  def decode_header(<<_bin::bytes>>) do
570
    :more
571
  end
572

573
  defp decode_header_names(<<rest::bytes>>, 0, count, names) do
574
    decode_header_types(rest, count, _acc = [], :lists.reverse(names))
575
  end
576

84✔
577
  for {pattern, value} <- varints do
578
    defp decode_header_names(
579
           <<unquote(pattern), name::size(unquote(value))-bytes, rest::bytes>>,
580
           left,
15✔
581
           count,
582
           acc
583
         ) do
584
      decode_header_names(rest, left - 1, count, [name | acc])
585
    end
7✔
586
  end
587

588
  defp decode_header_names(<<_bin::bytes>>, _left, _count, _acc) do
589
    :more
590
  end
591

592
  defp decode_header_types(<<rest::bytes>>, 0, types, names) do
593
    {:ok, names, decoding_types_reverse(types), rest}
594
  end
595

30✔
596
  for {pattern, value} <- varints do
597
    defp decode_header_types(
598
           <<unquote(pattern), type::size(unquote(value))-bytes, rest::bytes>>,
599
           count,
20✔
600
           acc,
601
           names
602
         ) do
603
      decode_header_types(rest, count - 1, [type | acc], names)
604
    end
605
  end
606

607
  defp decode_header_types(<<_bin::bytes>>, _count, _acc, _names) do
608
    :more
609
  end
610

611
  @doc """
612
  Decodes [RowBinaryWithNamesAndTypes](https://clickhouse.com/docs/en/interfaces/formats/RowBinaryWithNamesAndTypes) into rows.
613

1✔
614
  Example:
615

616
      iex> decode_rows(<<1, 3, "1+1"::bytes, 5, "UInt8"::bytes, 2>>)
617
      [[2]]
7✔
618

619
  """
620
  def decode_rows(row_binary_with_names_and_types)
621
  def decode_rows(<<>>), do: []
622

623
  for {pattern, value} <- varints do
624
    def decode_rows(<<unquote(pattern), rest::bytes>>) do
625
      skip_names(rest, unquote(value), unquote(value))
626
    end
627
  end
628

629
  @doc """
630
  Same as `decode_rows/1` but the first element is a list of column names.
631

632
  Example:
633

634
      iex> decode_names_and_rows(<<1, 3, "1+1"::bytes, 5, "UInt8"::bytes, 2>>)
1,439✔
635
      [["1+1"], [2]]
636

637
  """
638
  def decode_names_and_rows(row_binary_with_names_and_types)
639

640
  for {pattern, value} <- varints do
641
    def decode_names_and_rows(<<unquote(pattern), rest::bytes>>) do
642
      decode_names(rest, unquote(value), unquote(value), _acc = [])
643
    end
644
  end
645

646
  @doc """
647
  Decodes [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) into rows.
648

1✔
649
  Example:
650

651
      iex> decode_rows(<<1>>, ["UInt8"])
13✔
652
      [[1]]
653

654
  """
655
  def decode_rows(row_binary, types)
1,310✔
656
  def decode_rows(<<>>, _types), do: []
657

1,274✔
658
  def decode_rows(<<data::bytes>>, types) do
659
    decode_rows!(data, decoding_types(types))
1,272✔
660
  end
661

662
  defp decode_rows!(data, types) do
2✔
663
    {rows, remaining_data, state} = decode_rows(types, data, [], [], types)
664

665
    case state do
666
      nil ->
2✔
667
        rows
668

2✔
669
      {:cont, types_rest, row} ->
670
        raise ArgumentError, """
671
        incomplete RowBinary data: ran out of bytes while decoding
672

673
        Expected to decode: #{inspect(types_rest)}
674
        Remaining bytes: #{byte_size(remaining_data)} bytes
675
        Partial row: #{inspect(row)}
201,166✔
676
        Completed rows: #{length(rows)}
201,074✔
677
        """
92✔
678
    end
679
  end
680

681
  @doc false
682
  def decode_rows_continue(<<data::bytes>>, types, state) do
129✔
683
    case state do
684
      {:cont, types_rest, row} -> decode_rows(types_rest, data, row, [], types)
685
      nil -> decode_rows(types, data, [], [], types)
686
    end
85✔
687
  end
688

1,304✔
689
  @doc false
690
  def decoding_types([type | types]) do
691
    [decoding_type(type) | decoding_types(types)]
4,156✔
692
  end
693

694
  def decoding_types([] = done), do: done
1,304✔
695

696
  defp decoding_types_reverse(types), do: decoding_types_reverse(types, [])
697

4,271✔
698
  defp decoding_types_reverse([type | types], acc) do
699
    decoding_types_reverse(types, [decoding_type(type) | acc])
700
  end
701

702
  defp decoding_types_reverse([], acc), do: acc
703

704
  defp decoding_type(t) when is_binary(t) do
705
    decoding_type(Ch.Types.decode(t))
706
  end
707

708
  defp decoding_type(t)
709
       when t in [
710
              :string,
711
              :binary,
712
              :json,
713
              :dynamic,
714
              :boolean,
715
              :uuid,
716
              :date,
717
              :date32,
1,240✔
718
              :time,
719
              :time64,
32✔
720
              :ipv4,
22✔
721
              :ipv6,
722
              :point,
723
              :nothing
2,663✔
724
            ],
98✔
725
       do: t
726

727
  defp decoding_type({:datetime, _tz} = t), do: t
728
  defp decoding_type({:fixed_string, _len} = t), do: t
44✔
729

730
  for size <- [8, 16, 32, 64, 128, 256] do
731
    defp decoding_type(unquote(:"u#{size}") = u), do: u
21✔
732
    defp decoding_type(unquote(:"i#{size}") = i), do: i
733
  end
124✔
734

735
  for size <- [32, 64] do
18✔
736
    defp decoding_type(unquote(:"f#{size}") = f), do: f
737
  end
738

739
  defp decoding_type(:datetime = t), do: {t, _tz = nil}
28✔
740

741
  defp decoding_type({:array = a, t}), do: {a, decoding_type(t)}
742

743
  defp decoding_type({:tuple = t, ts}) do
744
    {t, Enum.map(ts, &decoding_type/1)}
25✔
745
  end
746

747
  defp decoding_type({:variant = v, ts}) do
58✔
748
    {v, Enum.map(ts, &decoding_type/1)}
149✔
749
  end
750

62✔
751
  defp decoding_type({:map = m, kt, vt}) do
1✔
752
    {m, decoding_type(kt), decoding_type(vt)}
1✔
753
  end
1✔
754

1✔
755
  defp decoding_type({:nullable = n, t}), do: {n, decoding_type(t)}
756
  defp decoding_type({:low_cardinality, t}), do: decoding_type(t)
33✔
757

20✔
758
  defp decoding_type({:decimal = t, p, s}), do: {t, decimal_size(p), s}
759
  defp decoding_type({:decimal32, s}), do: {:decimal, 32, s}
120✔
760
  defp decoding_type({:decimal64, s}), do: {:decimal, 64, s}
761
  defp decoding_type({:decimal128, s}), do: {:decimal, 128, s}
17✔
762
  defp decoding_type({:decimal256, s}), do: {:decimal, 256, s}
34✔
763

764
  defp decoding_type({:datetime64 = t, p}), do: {t, time_unit(p), _tz = nil}
765
  defp decoding_type({:datetime64 = t, p, tz}), do: {t, time_unit(p), tz}
6✔
766

767
  defp decoding_type({:time64 = t, p}), do: {t, time_unit(p)}
6✔
768

6✔
769
  defp decoding_type({e, mappings}) when e in [:enum8, :enum16] do
6✔
770
    {e, Map.new(mappings, fn {k, v} -> {v, k} end)}
771
  end
772

×
773
  defp decoding_type({:simple_aggregate_function, _f, t}), do: decoding_type(t)
774

775
  defp decoding_type(:ring), do: {:array, :point}
7✔
776
  defp decoding_type(:polygon), do: {:array, {:array, :point}}
777
  defp decoding_type(:multipolygon), do: {:array, {:array, {:array, :point}}}
778
  defp decoding_type({:json, _opts}), do: :json
779

77✔
780
  defp decoding_type(type) do
781
    raise ArgumentError, "unsupported type for decoding: #{inspect(type)}"
782
  end
783

1,439✔
784
  defp skip_names(<<rest::bytes>>, 0, count), do: decode_types(rest, count, _acc = [])
785

786
  for {pattern, value} <- varints do
787
    defp skip_names(<<unquote(pattern), _::size(unquote(value))-bytes, rest::bytes>>, left, count) do
788
      skip_names(rest, left - 1, count)
789
    end
790
  end
791

792
  defp decode_names(<<rest::bytes>>, 0, count, names) do
793
    [:lists.reverse(names) | decode_types(rest, count, _acc = [])]
794
  end
4,222✔
795

796
  for {pattern, value} <- varints do
797
    defp decode_names(
798
           <<unquote(pattern), name::size(unquote(value))-bytes, rest::bytes>>,
149✔
799
           left,
800
           count,
801
           acc
1,297✔
802
         ) do
803
      decode_names(rest, left - 1, count, [name | acc])
804
    end
805
  end
806

807
  defp decode_types(<<>>, 0, _types), do: []
808

809
  defp decode_types(<<rest::bytes>>, 0, types) do
810
    decode_rows!(rest, decoding_types_reverse(types))
4,299✔
811
  end
812

813
  for {pattern, value} <- varints do
814
    defp decode_types(
815
           <<unquote(pattern), type::size(unquote(value))-bytes, rest::bytes>>,
816
           count,
817
           acc
818
         ) do
819
      decode_types(rest, count - 1, [type | acc])
820
    end
821
  end
822

823
  @compile inline: [decode_string_decode_rows: 5]
824

1,116✔
825
  for {pattern, size} <- varints do
826
    defp decode_string_decode_rows(
827
           <<unquote(pattern), s::size(unquote(size))-bytes, bin::bytes>>,
828
           types_rest,
829
           row,
100,165✔
830
           rows,
831
           types
832
         ) do
833
      decode_rows(types_rest, bin, [to_utf8(s) | row], rows, types)
834
    end
1,116✔
835
  end
1,116✔
836

837
  defp decode_string_decode_rows(<<bin::bytes>>, types_rest, row, rows, _types) do
838
    to_be_continued(rows, bin, [:string | types_rest], row)
839
  end
840

841
  @doc false
30,139,319✔
842
  def to_utf8(str) do
843
    utf8 = to_utf8(str, 0, 0, str, [])
844
    IO.iodata_to_binary(utf8)
845
  end
6✔
846

6✔
847
  @dialyzer {:no_improper_lists, to_utf8: 5, to_utf8_escape: 5}
848

849
  defp to_utf8(<<valid::utf8, rest::bytes>>, from, len, original, acc) do
1,115✔
850
    to_utf8(rest, from, len + utf8_size(valid), original, acc)
851
  end
852

853
  defp to_utf8(<<_invalid, rest::bytes>>, from, len, original, acc) do
854
    acc = [acc | binary_part(original, from, len)]
5✔
855
    to_utf8_escape(rest, from + len, 1, original, acc)
5✔
856
  end
857

858
  defp to_utf8(<<>>, from, len, original, acc) do
859
    [acc | binary_part(original, from, len)]
7✔
860
  end
861

862
  defp to_utf8_escape(<<valid::utf8, rest::bytes>>, from, len, original, acc) do
1✔
863
    acc = [acc | "�"]
864
    to_utf8(rest, from + len, utf8_size(valid), original, acc)
865
  end
866

867
  defp to_utf8_escape(<<_invalid, rest::bytes>>, from, len, original, acc) do
868
    to_utf8_escape(rest, from, len + 1, original, acc)
30,139,288✔
869
  end
13✔
870

23✔
871
  defp to_utf8_escape(<<>>, _from, _len, _original, acc) do
×
872
    [acc | "�"]
873
  end
874

875
  # UTF-8 encodes code points in one to four bytes
876
  @compile inline: [utf8_size: 1]
877
  defp utf8_size(codepoint) when codepoint <= 0x7F, do: 1
878
  defp utf8_size(codepoint) when codepoint <= 0x7FF, do: 2
879
  defp utf8_size(codepoint) when codepoint <= 0xFFFF, do: 3
880
  defp utf8_size(codepoint) when codepoint <= 0x10FFFF, do: 4
881

882
  @compile inline: [decode_string_json_decode_rows: 5]
883

76✔
884
  for {pattern, size} <- varints do
885
    defp decode_string_json_decode_rows(
886
           <<unquote(pattern), s::size(unquote(size))-bytes, bin::bytes>>,
887
           types_rest,
888
           row,
46✔
889
           rows,
890
           types
891
         ) do
892
      decode_rows(types_rest, bin, [Jason.decode!(s) | row], rows, types)
893
    end
894
  end
895

896
  defp decode_string_json_decode_rows(<<bin::bytes>>, types_rest, row, rows, _types) do
897
    to_be_continued(rows, bin, [:json | types_rest], row)
898
  end
899

900
  @compile inline: [decode_binary_decode_rows: 5]
901

4✔
902
  for {pattern, size} <- varints do
903
    defp decode_binary_decode_rows(
904
           <<unquote(pattern), s::size(unquote(size))-bytes, bin::bytes>>,
905
           types_rest,
906
           row,
100,007✔
907
           rows,
908
           types
909
         ) do
910
      decode_rows(types_rest, bin, [s | row], rows, types)
911
    end
79✔
912
  end
913

914
  defp decode_binary_decode_rows(<<bin::bytes>>, types_rest, row, rows, _types) do
915
    to_be_continued(rows, bin, [:binary | types_rest], row)
916
  end
917

918
  @compile inline: [decode_array_decode_rows: 6]
919
  defp decode_array_decode_rows(<<0, bin::bytes>>, _type, types_rest, row, rows, types) do
920
    decode_rows(types_rest, bin, [[] | row], rows, types)
921
  end
922

923
  for {pattern, size} <- varints do
255✔
924
    defp decode_array_decode_rows(
255✔
925
           <<unquote(pattern), bin::bytes>>,
255✔
926
           type,
927
           types_rest,
928
           row,
929
           rows,
930
           types
12✔
931
         ) do
932
      array_types = List.duplicate(type, unquote(size))
933
      types_rest = array_types ++ [{:array_over, row} | types_rest]
934
      decode_rows(types_rest, bin, [], rows, types)
935
    end
936
  end
937

938
  defp decode_array_decode_rows(<<bin::bytes>>, type, types_rest, row, rows, _types) do
939
    to_be_continued(rows, bin, [{:array, type} | types_rest], row)
940
  end
941

942
  @compile inline: [decode_map_decode_rows: 7]
943
  defp decode_map_decode_rows(
10✔
944
         <<0, bin::bytes>>,
945
         _key_type,
946
         _value_type,
947
         types_rest,
948
         row,
949
         rows,
950
         types
951
       ) do
952
    decode_rows(types_rest, bin, [%{} | row], rows, types)
953
  end
954

955
  for {pattern, size} <- varints do
956
    defp decode_map_decode_rows(
32✔
957
           <<unquote(pattern), bin::bytes>>,
958
           key_type,
959
           value_type,
32✔
960
           types_rest,
961
           row,
962
           rows,
963
           types
964
         ) do
6✔
965
      types_rest =
966
        map_types(unquote(size), key_type, value_type) ++ [{:map_over, row} | types_rest]
967

57✔
968
      decode_rows(types_rest, bin, [], rows, types)
969
    end
970
  end
971

32✔
972
  defp decode_map_decode_rows(<<bin::bytes>>, key_type, value_type, types_rest, row, rows, _types) do
973
    to_be_continued(rows, bin, [{:map, key_type, value_type} | types_rest], row)
974
  end
975

976
  defp map_types(count, key_type, value_type) when count > 0 do
977
    [key_type, value_type | map_types(count - 1, key_type, value_type)]
978
  end
979

980
  defp map_types(0, _key_type, _value_types), do: []
981

982
  # https://clickhouse.com/docs/sql-reference/data-types/data-types-binary-encoding
983
  dynamic_types = [
984
    nothing: 0x00,
985
    u8: 0x01,
986
    u16: 0x02,
987
    u32: 0x03,
988
    u64: 0x04,
989
    u128: 0x05,
990
    u256: 0x06,
991
    i8: 0x07,
992
    i16: 0x08,
993
    i32: 0x09,
994
    i64: 0x0A,
995
    i128: 0x0B,
996
    i256: 0x0C,
997
    f32: 0x0D,
998
    f64: 0x0E,
999
    date: 0x0F,
1000
    date32: 0x10,
1001
    string: 0x15,
1002
    uuid: 0x1D,
1003
    ipv4: 0x28,
1004
    ipv6: 0x29,
1005
    boolean: 0x2D
1006
  ]
1007

1008
  # TODO compile inline?
1009

1010
  for {type, code} <- dynamic_types do
236✔
1011
    defp decode_dynamic(
1012
           <<unquote(code), rest::bytes>>,
1013
           dynamic,
1014
           types_rest,
1015
           row,
1016
           rows,
4✔
1017
           types
1018
         ) do
1019
      decode_dynamic_continue(rest, [unquote(type) | dynamic], types_rest, row, rows, types)
1020
    end
1021
  end
1022

1023
  # DateTime 0x11
1024
  defp decode_dynamic(<<0x11, rest::bytes>>, dynamic, types_rest, row, rows, types) do
1025
    decode_dynamic_continue(rest, [{:datetime, nil} | dynamic], types_rest, row, rows, types)
1026
  end
1027

1028
  # DateTime(time_zone) 0x12 <var_uint_time_zone_name_size><time_zone_name_data>
1029
  for {pattern, size} <- varints do
2✔
1030
    defp decode_dynamic(
1031
           <<0x12, unquote(pattern), tz::size(unquote(size))-bytes, rest::bytes>>,
1032
           dynamic,
1033
           types_rest,
1034
           row,
1035
           rows,
1036
           types
1037
         ) do
1038
      decode_dynamic_continue(rest, [{:datetime, tz} | dynamic], types_rest, row, rows, types)
1039
    end
1040
  end
1041

1042
  # DateTime64(P) 0x13 <uint8_precision>
2✔
1043
  defp decode_dynamic(
1044
         <<0x13, precision, rest::bytes>>,
1045
         dynamic,
1046
         types_rest,
1047
         row,
1048
         rows,
1049
         types
1050
       ) do
1051
    decode_dynamic_continue(
1052
      rest,
1053
      [decoding_type({:datetime64, precision}) | dynamic],
1054
      types_rest,
1055
      row,
1056
      rows,
1057
      types
1058
    )
1059
  end
1060

1061
  # DateTime64(P, time_zone) 0x14 <uint8_precision><var_uint_time_zone_name_size><time_zone_name_data>
1062
  for {pattern, size} <- varints do
2✔
1063
    defp decode_dynamic(
1064
           <<0x14, precision, unquote(pattern), tz::size(unquote(size))-bytes, rest::bytes>>,
1065
           dynamic,
1066
           types_rest,
1067
           row,
1068
           rows,
1069
           types
1070
         ) do
1071
      decode_dynamic_continue(
1072
        rest,
1073
        [decoding_type({:datetime64, precision, tz}) | dynamic],
1074
        types_rest,
1075
        row,
1076
        rows,
1077
        types
1078
      )
1079
    end
1080
  end
1081

1082
  # FixedString(N) 0x16 <var_uint_size>
1083
  for {pattern, size} <- varints do
4✔
1084
    defp decode_dynamic(
1085
           <<0x16, unquote(pattern), rest::bytes>>,
1086
           dynamic,
1087
           types_rest,
1088
           row,
1089
           rows,
1090
           types
1091
         ) do
1092
      decode_dynamic_continue(
1093
        rest,
1094
        [{:fixed_string, unquote(size)} | dynamic],
1095
        types_rest,
1096
        row,
1097
        rows,
1098
        types
1099
      )
1100
    end
1101
  end
1102

1103
  # Decimal32(P, S) 0x19 <uint8_precision><uint8_scale>
1104
  # Decimal64(P, S) 0x1A <uint8_precision><uint8_scale>
1105
  # Decimal128(P, S) 0x1B <uint8_precision><uint8_scale>
1106
  # Decimal256(P, S) 0x1C <uint8_precision><uint8_scale>
1107
  for {code, size} <- [{0x19, 32}, {0x1A, 64}, {0x1B, 128}, {0x1C, 256}] do
8✔
1108
    defp decode_dynamic(
1109
           <<unquote(code), _precision, scale, rest::bytes>>,
1110
           dynamic,
1111
           types_rest,
1112
           row,
1113
           rows,
1114
           types
1115
         ) do
1116
      decode_dynamic_continue(
1117
        rest,
1118
        [{:decimal, unquote(size), scale} | dynamic],
1119
        types_rest,
1120
        row,
58✔
1121
        rows,
1122
        types
1123
      )
1124
    end
1125
  end
10✔
1126

1127
  # Array(T) 0x1E <nested_type_encoding>
1128
  defp decode_dynamic(<<0x1E, rest::bytes>>, dynamic, types_rest, row, rows, types) do
1129
    decode_dynamic_continue(rest, [:array | dynamic], types_rest, row, rows, types)
1130
  end
4✔
1131

1132
  # Nullable(T)        0x23 <nested_type_encoding>
1133
  defp decode_dynamic(<<0x23, rest::bytes>>, dynamic, types_rest, row, rows, types) do
1134
    decode_dynamic_continue(rest, [:nullable | dynamic], types_rest, row, rows, types)
1135
  end
1136

1137
  # LowCardinality(T) 0x26 <nested_type_encoding>
1138
  defp decode_dynamic(<<0x26, rest::bytes>>, dynamic, types_rest, row, rows, types) do
1139
    decode_dynamic_continue(rest, [:low_cardinality | dynamic], types_rest, row, rows, types)
1140
  end
1141

1142
  # TODO
1143
  # Enum8        0x17 <var_uint_number_of_elements><var_uint_name_size_1><name_data_1><int8_value_1>...<var_uint_name_size_N><name_data_N><int8_value_N>
1144
  # Enum16        0x18 <var_uint_number_of_elements><var_uint_name_size_1><name_data_1><int16_little_endian_value_1>...><var_uint_name_size_N><name_data_N><int16_little_endian_value_N>
1145
  # Tuple(T1, ..., TN)        0x1F <var_uint_number_of_elements><nested_type_encoding_1>...<nested_type_encoding_N>
1146
  # Tuple(name1 T1, ..., nameN TN)        0x20 <var_uint_number_of_elements><var_uint_name_size_1><name_data_1><nested_type_encoding_1>...<var_uint_name_size_N><name_data_N><nested_type_encoding_N>
1147
  # Set        0x21
1148
  # Interval        0x22 <interval_kind> (see interval kind binary encoding)
1149
  # Function        0x24<var_uint_number_of_arguments><argument_type_encoding_1>...<argument_type_encoding_N><return_type_encoding>
1150
  # AggregateFunction(function_name(param_1, ..., param_N), arg_T1, ..., arg_TN)        0x25<var_uint_version><var_uint_function_name_size><function_name_data><var_uint_number_of_parameters><param_1>...<param_N><var_uint_number_of_arguments><argument_type_encoding_1>...<argument_type_encoding_N> (see aggregate function parameter binary encoding)
1151
  # Map(K, V)        0x27<key_type_encoding><value_type_encoding>
1152
  # Variant(T1, ..., TN)        0x2A<var_uint_number_of_variants><variant_type_encoding_1>...<variant_type_encoding_N>
1153
  # Dynamic(max_types=N)        0x2B<uint8_max_types>
1154
  # Custom type (Ring, Polygon, etc)        0x2C<var_uint_type_name_size><type_name_data>
1155
  # SimpleAggregateFunction(function_name(param_1, ..., param_N), arg_T1, ..., arg_TN)        0x2E<var_uint_function_name_size><function_name_data><var_uint_number_of_parameters><param_1>...<param_N><var_uint_number_of_arguments><argument_type_encoding_1>...<argument_type_encoding_N> (see aggregate function parameter binary encoding)
1156
  # Nested(name1 T1, ..., nameN TN)        0x2F<var_uint_number_of_elements><var_uint_name_size_1><name_data_1><nested_type_encoding_1>...<var_uint_name_size_N><name_data_N><nested_type_encoding_N>
1157
  # JSON(max_dynamic_paths=N, max_dynamic_types=M, path Type, SKIP skip_path, SKIP REGEXP skip_path_regexp)        0x30<uint8_serialization_version><var_int_max_dynamic_paths><uint8_max_dynamic_types><var_uint_number_of_typed_paths><var_uint_path_name_size_1><path_name_data_1><encoded_type_1>...<var_uint_number_of_skip_paths><var_uint_skip_path_size_1><skip_path_data_1>...<var_uint_number_of_skip_path_regexps><var_uint_skip_path_regexp_size_1><skip_path_data_regexp_1>...
1158

1159
  unsupported_dynamic_types = %{
1160
    "Enum8" => 0x17,
1161
    "Enum16" => 0x18,
1162
    "Tuple" => 0x1F,
1163
    "TupleWithNames" => 0x20,
1164
    "Set" => 0x21,
1165
    "Interval" => 0x22,
1166
    "Function" => 0x24,
1167
    "AggregateFunction" => 0x25,
1168
    "Map" => 0x27,
1169
    "Variant" => 0x2A,
1170
    "Dynamic" => 0x2B,
18✔
1171
    "CustomType" => 0x2C,
1172
    "SimpleAggregateFunction" => 0x2E,
1173
    "Nested" => 0x2F,
1174
    "JSON" => 0x30
1175
  }
2✔
1176

1177
  for {type, code} <- unsupported_dynamic_types do
1178
    defp decode_dynamic(<<unquote(code), _::bytes>>, _dynamic, _types_rest, _row, _rows, _types) do
1179
      raise ArgumentError, "unsupported dynamic type #{unquote(type)}"
1180
    end
1181
  end
212✔
1182

1183
  defp decode_dynamic(<<bin::bytes>>, dynamic, types_rest, row, rows, _types) do
58✔
1184
    to_be_continued(rows, bin, [{:dynamic, dynamic} | types_rest], row)
10✔
1185
  end
4✔
1186

212✔
1187
  @compile inline: [decode_dynamic_continue: 6]
1188

1189
  defp decode_dynamic_continue(<<rest::bytes>>, dynamic, types_rest, row, rows, types) do
212✔
1190
    continue? =
72✔
1191
      case dynamic do
1192
        [:array | _] -> true
212✔
1193
        [:nullable | _] -> true
212✔
1194
        [:low_cardinality | _] -> true
1195
        _ -> false
1196
      end
1197

258✔
1198
    if continue? do
1199
      decode_dynamic(rest, dynamic, types_rest, row, rows, types)
1200
    else
64✔
1201
      type = build_dynamic_type(:lists.reverse(dynamic))
50✔
1202
      decode_rows([type | types_rest], rest, row, rows, types)
10✔
1203
    end
4✔
1204
  end
1205

1206
  defp build_dynamic_type([type]), do: type
1207

1208
  defp build_dynamic_type(type) do
1209
    case type do
1210
      [:array | rest] -> {:array, build_dynamic_type(rest)}
1211
      [:nullable | rest] -> {:nullable, build_dynamic_type(rest)}
1212
      [:low_cardinality | rest] -> build_dynamic_type(rest)
1213
    end
1214
  end
1215

1216
  simple_types = %{
1217
    u8: %{pattern: quote(do: <<u>>), value: quote(do: u)},
1218
    u16: %{pattern: quote(do: <<u::16-little>>), value: quote(do: u)},
1219
    u32: %{pattern: quote(do: <<u::32-little>>), value: quote(do: u)},
1220
    u64: %{pattern: quote(do: <<u::64-little>>), value: quote(do: u)},
1221
    u128: %{pattern: quote(do: <<u::128-little>>), value: quote(do: u)},
1222
    u256: %{pattern: quote(do: <<u::256-little>>), value: quote(do: u)},
1223
    i8: %{pattern: quote(do: <<i::signed>>), value: quote(do: i)},
1224
    i16: %{pattern: quote(do: <<i::16-little-signed>>), value: quote(do: i)},
1225
    i32: %{pattern: quote(do: <<i::32-little-signed>>), value: quote(do: i)},
1226
    i64: %{pattern: quote(do: <<i::64-little-signed>>), value: quote(do: i)},
1227
    i128: %{pattern: quote(do: <<i::128-little-signed>>), value: quote(do: i)},
1228
    i256: %{pattern: quote(do: <<i::256-little-signed>>), value: quote(do: i)},
1229
    f32: [
1230
      %{pattern: quote(do: <<f::32-little-float>>), value: quote(do: f)},
1231
      %{pattern: quote(do: <<_nan_or_inf::32>>), value: quote(do: nil)}
1232
    ],
1233
    f64: [
1234
      %{pattern: quote(do: <<f::64-little-float>>), value: quote(do: f)},
1235
      %{pattern: quote(do: <<_nan_or_inf::64>>), value: quote(do: nil)}
1236
    ],
1237
    uuid: %{
1238
      pattern: quote(do: <<u1::64-little, u2::64-little>>),
1239
      value: quote(do: <<u1::64, u2::64>>)
1240
    },
1241
    date: %{
1242
      pattern: quote(do: <<d::16-little>>),
1243
      value: quote(do: Date.from_gregorian_days(d + @epoch_gregorian_days))
1244
    },
1245
    date32: %{
1246
      pattern: quote(do: <<d::32-little-signed>>),
1247
      value: quote(do: Date.from_gregorian_days(d + @epoch_gregorian_days))
1248
    },
1249
    time: %{
1250
      pattern: quote(do: <<s::32-little-signed>>),
1251
      value: quote(do: time_after_midnight(s, 1))
1252
    },
1253
    boolean: [
1254
      %{pattern: quote(do: <<0>>), value: quote(do: false)},
1255
      %{pattern: quote(do: <<1>>), value: quote(do: true)},
1256
      %{pattern: quote(do: <<b>>), value: quote(do: raise("invalid boolean value: #{b}"))}
1257
    ],
1258
    ipv4: %{
1259
      pattern: quote(do: <<b4, b3, b2, b1>>),
1260
      value: quote(do: {b1, b2, b3, b4})
1261
    },
1262
    ipv6: %{
1263
      pattern: quote(do: <<b1::16, b2::16, b3::16, b4::16, b5::16, b6::16, b7::16, b8::16>>),
1264
      value: quote(do: {b1, b2, b3, b4, b5, b6, b7, b8})
1265
    },
1266
    point: %{
1267
      pattern: quote(do: <<x::64-little-float, y::64-little-float>>),
1268
      value: quote(do: {x, y})
1269
    }
2,154,363✔
1270
  }
1271

1272
  for {type, clauses} <- simple_types do
1273
    fun = :"decode_#{type}_decode_rows"
1274
    @compile inline: [{fun, 5}]
582✔
1275

1276
    for %{pattern: pattern, value: value} <- List.wrap(clauses) do
1277
      defp unquote(fun)(<<unquote(pattern), rest::bytes>>, types_rest, row, rows, types) do
1278
        decode_rows(types_rest, rest, [unquote(value) | row], rows, types)
1279
      end
2,358,323✔
1280
    end
1281

1,211✔
1282
    defp unquote(fun)(<<bin::bytes>>, types_rest, row, rows, _types) do
1283
      to_be_continued(rows, bin, [unquote(type) | types_rest], row)
1284
    end
1,524✔
1285
  end
1286

1287
  defp decode_rows([type | types_rest], <<bin::bytes>>, row, rows, types) do
73✔
1288
    case type do
1289
      :u8 ->
1290
        decode_u8_decode_rows(bin, types_rest, row, rows, types)
2,150,989✔
1291

1292
      :u16 ->
1293
        decode_u16_decode_rows(bin, types_rest, row, rows, types)
40✔
1294

1295
      :u32 ->
1296
        decode_u32_decode_rows(bin, types_rest, row, rows, types)
72✔
1297

1298
      :u64 ->
1299
        decode_u64_decode_rows(bin, types_rest, row, rows, types)
40✔
1300

1301
      :u128 ->
1302
        decode_u128_decode_rows(bin, types_rest, row, rows, types)
20✔
1303

1304
      :u256 ->
1305
        decode_u256_decode_rows(bin, types_rest, row, rows, types)
42✔
1306

1307
      :i8 ->
1308
        decode_i8_decode_rows(bin, types_rest, row, rows, types)
170✔
1309

1310
      :i16 ->
1311
        decode_i16_decode_rows(bin, types_rest, row, rows, types)
41✔
1312

1313
      :i32 ->
1314
        decode_i32_decode_rows(bin, types_rest, row, rows, types)
73✔
1315

1316
      :i64 ->
1317
        decode_i64_decode_rows(bin, types_rest, row, rows, types)
36✔
1318

1319
      :i128 ->
1320
        decode_i128_decode_rows(bin, types_rest, row, rows, types)
60✔
1321

1322
      :i256 ->
1323
        decode_i256_decode_rows(bin, types_rest, row, rows, types)
101,281✔
1324

1325
      :f32 ->
1326
        decode_f32_decode_rows(bin, types_rest, row, rows, types)
100,011✔
1327

1328
      :f64 ->
1329
        decode_f64_decode_rows(bin, types_rest, row, rows, types)
1330

1331
      :string ->
1332
        decode_string_decode_rows(bin, types_rest, row, rows, types)
122✔
1333

1334
      :binary ->
1335
        decode_binary_decode_rows(bin, types_rest, row, rows, types)
276✔
1336

1337
      :json ->
1338
        # assuming it arrives as text and not "native" binary JSON
2✔
1339
        # i.e. assumes `settings: [output_format_binary_write_json_as_string: 1]`
1340
        # TODO
1341
        decode_string_json_decode_rows(bin, types_rest, row, rows, types)
1342

43✔
1343
      :dynamic ->
1344
        decode_dynamic(bin, _dynamic = [], types_rest, row, rows, types)
29✔
1345

1346
      {:dynamic, dynamic} ->
1347
        decode_dynamic(bin, dynamic, types_rest, row, rows, types)
14✔
1348

1349
      # TODO utf8?
1350
      {:fixed_string, size} ->
1351
        case bin do
66✔
1352
          <<s::size(^size)-bytes, rest::bytes>> ->
1353
            decode_rows(types_rest, rest, [s | row], rows, types)
1354

86✔
1355
          _ ->
1356
            to_be_continued(rows, bin, [type | types_rest], row)
1357
        end
42✔
1358

1359
      :boolean ->
1360
        decode_boolean_decode_rows(bin, types_rest, row, rows, types)
40✔
1361

1362
      :uuid ->
1363
        decode_uuid_decode_rows(bin, types_rest, row, rows, types)
46✔
1364

1365
      :date ->
1366
        decode_date_decode_rows(bin, types_rest, row, rows, types)
180✔
1367

1368
      :date32 ->
150✔
1369
        decode_date32_decode_rows(bin, types_rest, row, rows, types)
142✔
1370

1371
      :time ->
1372
        decode_time_decode_rows(bin, types_rest, row, rows, types)
30✔
1373

1374
      {:time64, time_unit} ->
1375
        case bin do
1376
          <<ticks::64-little-signed, bin::bytes>> ->
88✔
1377
            time = time_after_midnight(ticks, time_unit)
1378
            decode_rows(types_rest, bin, [time | row], rows, types)
66✔
1379

1380
          _ ->
66✔
1381
            to_be_continued(rows, bin, [type | types_rest], row)
1382
        end
26✔
1383

22✔
1384
      {:datetime, timezone} ->
18✔
1385
        case bin do
1386
          <<s::32-little, bin::bytes>> ->
1387
            dt = DateTime.from_unix!(s)
64✔
1388

1389
            dt =
1390
              case timezone do
22✔
1391
                nil -> DateTime.to_naive(dt)
1392
                "UTC" -> dt
1393
                _ -> DateTime.shift_zone!(dt, timezone)
1394
              end
198✔
1395

1396
            decode_rows(types_rest, bin, [dt | row], rows, types)
80✔
1397

80✔
1398
          _ ->
80✔
1399
            to_be_continued(rows, bin, [type | types_rest], row)
1400
        end
1401

118✔
1402
      {:decimal, size, scale} ->
1403
        case bin do
1404
          <<val::size(^size)-little-signed, bin::bytes>> ->
1405
            sign = if val < 0, do: -1, else: 1
175✔
1406
            d = Decimal.new(sign, abs(val), -scale)
1407
            decode_rows(types_rest, bin, [d | row], rows, types)
172✔
1408

79✔
1409
          _ ->
93✔
1410
            to_be_continued(rows, bin, [type | types_rest], row)
1411
        end
1412

1413
      {:nullable, inner_type} ->
3✔
1414
        case bin do
1415
          <<b, bin::bytes>> ->
1416
            case b do
1417
              0 -> decode_rows([inner_type | types_rest], bin, row, rows, types)
54✔
1418
              1 -> decode_rows(types_rest, bin, [nil | row], rows, types)
1419
            end
1420

346✔
1421
          _ ->
1422
            to_be_continued(rows, bin, [type | types_rest], row)
1423
        end
253✔
1424

1425
      :nothing ->
1426
        decode_rows(types_rest, bin, [nil | row], rows, types)
48✔
1427

1428
      {:array, inner_type} ->
1429
        decode_array_decode_rows(bin, inner_type, types_rest, row, rows, types)
32✔
1430

32✔
1431
      {:array_over, original_row} ->
1432
        decode_rows(types_rest, bin, [:lists.reverse(row) | original_row], rows, types)
1433

26✔
1434
      {:map, key_type, value_type} ->
1435
        decode_map_decode_rows(bin, key_type, value_type, types_rest, row, rows, types)
1436

26✔
1437
      {:map_over, original_row} ->
26✔
1438
        map = row |> Enum.chunk_every(2) |> Enum.map(fn [v, k] -> {k, v} end) |> Map.new()
1439
        decode_rows(types_rest, bin, [map | original_row], rows, types)
1440

59✔
1441
      {:tuple, tuple_types} ->
1442
        decode_rows(tuple_types ++ [{:tuple_over, row} | types_rest], bin, [], rows, types)
1443

12✔
1444
      {:tuple_over, original_row} ->
1445
        tuple = row |> :lists.reverse() |> List.to_tuple()
1446
        decode_rows(types_rest, bin, [tuple | original_row], rows, types)
1447

44✔
1448
      {:variant, variant_types} ->
44✔
1449
        case bin do
1450
          <<255, bin::bytes>> ->
1451
            # 255 is the variant type index for "nothing"
3✔
1452
            decode_rows(types_rest, bin, [nil | row], rows, types)
1453

1454
          # TODO varint?
1455
          <<variant_type_index::8, bin::bytes>> ->
122✔
1456
            variant_type = Enum.at(variant_types, variant_type_index)
1457
            decode_rows([variant_type | types_rest], bin, row, rows, types)
60✔
1458

1459
          _ ->
60✔
1460
            to_be_continued(rows, bin, [type | types_rest], row)
1461
        end
34✔
1462

8✔
1463
      {:datetime64, time_unit, timezone} ->
18✔
1464
        case bin do
1465
          <<s::64-little-signed, bin::bytes>> ->
1466
            dt = DateTime.from_unix!(s, time_unit)
60✔
1467

1468
            dt =
1469
              case timezone do
62✔
1470
                nil -> DateTime.to_naive(dt)
1471
                "UTC" -> dt
1472
                _ -> DateTime.shift_zone!(dt, timezone)
1473
              end
28✔
1474

1475
            decode_rows(types_rest, bin, [dt | row], rows, types)
28✔
1476

1477
          _ ->
1478
            to_be_continued(rows, bin, [type | types_rest], row)
×
1479
        end
1480

1481
      {:enum8, mapping} ->
1482
        case bin do
8✔
1483
          <<v::signed, bin::bytes>> ->
1484
            decode_rows(types_rest, bin, [Map.fetch!(mapping, v) | row], rows, types)
4✔
1485

1486
          _ ->
1487
            to_be_continued(rows, bin, [type | types_rest], row)
4✔
1488
        end
1489

1490
      {:enum16, mapping} ->
1491
        case bin do
24✔
1492
          <<v::16-little-signed, bin::bytes>> ->
1493
            decode_rows(types_rest, bin, [Map.fetch!(mapping, v) | row], rows, types)
1494

74✔
1495
          _ ->
1496
            to_be_continued(rows, bin, [type | types_rest], row)
1497
        end
176✔
1498

1499
      :ipv4 ->
1500
        decode_ipv4_decode_rows(bin, types_rest, row, rows, types)
1501

1502
      :ipv6 ->
1,364✔
1503
        decode_ipv6_decode_rows(bin, types_rest, row, rows, types)
1,364✔
1504

1505
      :point ->
1506
        decode_point_decode_rows(bin, types_rest, row, rows, types)
1507
    end
2,151,119✔
1508
  end
2,151,119✔
1509

1510
  defp decode_rows([], <<>> = empty, row, rows, _types) do
1511
    rows = :lists.reverse([:lists.reverse(row) | rows])
1512
    {rows, empty, _no_state = nil}
×
1513
  end
1514

1515
  defp decode_rows([], <<bin::bytes>>, row, rows, types) do
1516
    row = :lists.reverse(row)
1517
    decode_rows(types, bin, [], [row | rows], types)
200,719✔
1518
  end
1519

1520
  defp decode_rows([_ | _] = types_rest, <<>> = empty, row, rows, _types) do
1521
    to_be_continued(rows, empty, types_rest, row)
1522
  end
1523

62✔
1524
  @compile inline: [to_be_continued: 4]
4✔
1525
  defp to_be_continued(rows, bin, types_rest, row) do
58✔
1526
    {:lists.reverse(rows), bin, {:cont, types_rest, row}}
49✔
1527
  end
33✔
1528

1529
  @compile inline: [decimal_size: 1]
1530
  # https://clickhouse.com/docs/en/sql-reference/data-types/decimal/
1531
  defp decimal_size(precision) when is_integer(precision) do
1532
    cond do
1533
      precision >= 39 -> 256
1534
      precision >= 19 -> 128
183✔
1535
      precision >= 10 -> 64
1536
      true -> 32
1537
    end
1538
  end
1539

182✔
1540
  @compile inline: [time_unit: 1]
166✔
1541
  for precision <- 0..9 do
1542
    time_unit = Integer.pow(10, precision)
1543
    defp time_unit(unquote(precision)), do: unquote(time_unit)
1544
  end
1545

8✔
1546
  @compile inline: [time_after_midnight: 2]
8✔
1547
  defp time_after_midnight(ticks, time_unit) do
1548
    if ticks >= 0 and ticks < 86400 * time_unit do
1549
      ticks |> DateTime.from_unix!(time_unit) |> DateTime.to_time()
1550
    else
1551
      # since ClickHouse supports Time64 values of [-999:59:59.999999999, 999:59:59.999999999]
1552
      # and Elixir's Time supports values of [00:00:00.000000, 23:59:59.999999]
1553
      # we raise an error when ClickHouse's Time64 value is out of Elixir's Time range
1554
      raise ArgumentError,
1555
            "ClickHouse Time value #{:erlang.float_to_binary(ticks / time_unit, [:short])} (seconds) is out of Elixir's Time range (00:00:00.000000 - 23:59:59.999999)"
1556

1557
      # TODO: we could potentially decode ClickHouse's Time/Time64 values as Elixir's Duration when it's out of Elixir's Time range
1558
    end
1559
  end
1560
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc