• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

plausible / ch / e4878c1877ab767da1ce7a85de16a01dc5ca2d7c-PR-311

20 Apr 2026 05:28PM UTC coverage: 91.51% (+0.02%) from 91.494%
e4878c1877ab767da1ce7a85de16a01dc5ca2d7c-PR-311

Pull #311

github

ruslandoga
improve naive datetime handling in rowbinary
Pull Request #311: improve naive datetime handling in rowbinary

1 of 1 new or added line in 1 file covered. (100.0%)

36 existing lines in 1 file now uncovered.

927 of 1013 relevant lines covered (91.51%)

79490.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.51
/lib/ch/row_binary.ex
1
defmodule Ch.RowBinary do
2
  @moduledoc "Helpers for working with ClickHouse [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) format."
3

4
  # @compile {:bin_opt_info, true}
5
  @dialyzer :no_improper_lists
6

7
  import Bitwise
8

9
  @epoch_date ~D[1970-01-01]
10
  @epoch_naive_datetime NaiveDateTime.new!(@epoch_date, ~T[00:00:00])
11
  @epoch_utc_datetime DateTime.new!(@epoch_date, ~T[00:00:00])
12

13
  {epoch_seconds_since_gregorian, _} = NaiveDateTime.to_gregorian_seconds(@epoch_naive_datetime)
14
  @epoch_seconds_since_gregorian epoch_seconds_since_gregorian
15

16
  @doc false
17
  def encode_names_and_types(names, types) do
8✔
18
    [encode(:varint, length(names)), encode_many(names, :string), encode_types(types)]
19
  end
20

21
  defp encode_types([type | types]) do
22
    encoded =
20✔
23
      case type do
UNCOV
24
        _ when is_binary(type) -> type
×
25
        _ -> Ch.Types.encode(type)
20✔
26
      end
27

28
    [encode(:string, encoded) | encode_types(types)]
29
  end
30

31
  defp encode_types([] = done), do: done
8✔
32

33
  @doc """
34
  Encodes a single row to [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) as iodata.
35

36
  Examples:
37

38
      iex> encode_row([], [])
39
      []
40

41
      iex> encode_row([1], ["UInt8"])
42
      [1]
43

44
      iex> encode_row([3, "hello"], ["UInt8", "String"])
45
      [3, [5 | "hello"]]
46

47
  """
48
  def encode_row(row, types) do
49
    _encode_row(row, encoding_types(types))
26✔
50
  end
51

52
  defp _encode_row([el | els], [type | types]), do: [encode(type, el) | _encode_row(els, types)]
114✔
53
  defp _encode_row([] = done, []), do: done
26✔
54

55
  @doc """
56
  Encodes multiple rows to [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) as iodata.
57

58
  Examples:
59

60
      iex> encode_rows([], [])
61
      []
62

63
      iex> encode_rows([[1]], ["UInt8"])
64
      [1]
65

66
      iex> encode_rows([[3, "hello"], [4, "hi"]], ["UInt8", "String"])
67
      [3, [5 | "hello"], 4, [2 | "hi"]]
68

69
  """
70
  def encode_rows(rows, types) do
71
    _encode_rows(rows, encoding_types(types))
127✔
72
  end
73

74
  @doc false
75
  def _encode_rows([row | rows], types), do: _encode_rows(row, types, rows, types)
2,000,216✔
76
  def _encode_rows([] = done, _types), do: done
127✔
77

78
  defp _encode_rows([el | els], [t | ts], rows, types) do
2,000,423✔
79
    [encode(t, el) | _encode_rows(els, ts, rows, types)]
80
  end
81

82
  defp _encode_rows([], [], rows, types), do: _encode_rows(rows, types)
2,000,216✔
83

84
  @doc false
85
  def encoding_types([type | types]) do
348✔
86
    [encoding_type(type) | encoding_types(types)]
87
  end
88

89
  def encoding_types([] = done), do: done
153✔
90

91
  defp encoding_type(type) when is_binary(type) do
92
    encoding_type(Ch.Types.decode(type))
213✔
93
  end
94

95
  defp encoding_type(t)
96
       when t in [
97
              :string,
98
              :binary,
99
              :json,
100
              :dynamic,
101
              :boolean,
102
              :uuid,
103
              :date,
104
              :datetime,
105
              :date32,
106
              :time,
107
              :ipv4,
108
              :ipv6,
109
              :point,
110
              :nothing
111
            ],
112
       do: t
151✔
113

114
  defp encoding_type({:datetime = d, "UTC"}), do: d
×
115

116
  defp encoding_type({:datetime, tz}) do
UNCOV
117
    raise ArgumentError, "can't encode DateTime with non-UTC timezone: #{inspect(tz)}"
×
118
  end
119

120
  defp encoding_type({:fixed_string, _len} = t), do: t
13✔
121

122
  for size <- [8, 16, 32, 64, 128, 256] do
123
    defp encoding_type(unquote(:"u#{size}") = u), do: u
127✔
124
    defp encoding_type(unquote(:"i#{size}") = i), do: i
42✔
125
  end
126

127
  for size <- [32, 64] do
128
    defp encoding_type(unquote(:"f#{size}") = f), do: f
6✔
129
  end
130

131
  defp encoding_type({:array = a, t}), do: {a, encoding_type(t)}
25✔
132

133
  defp encoding_type({:tuple = t, ts}) do
4✔
134
    {t, Enum.map(ts, &encoding_type/1)}
135
  end
136

137
  defp encoding_type({:variant = v, ts}) do
4✔
138
    {v, Enum.map(ts, &encoding_type/1)}
139
  end
140

141
  defp encoding_type({:map = m, kt, vt}) do
142
    {m, encoding_type(kt), encoding_type(vt)}
9✔
143
  end
144

145
  defp encoding_type({:nullable = n, t}), do: {n, encoding_type(t)}
16✔
146
  defp encoding_type({:low_cardinality, t}), do: encoding_type(t)
4✔
147

148
  defp encoding_type({:decimal, p, s}) do
149
    case decimal_size(p) do
×
150
      32 -> {:decimal32, s}
×
UNCOV
151
      64 -> {:decimal64, s}
×
UNCOV
152
      128 -> {:decimal128, s}
×
UNCOV
153
      256 -> {:decimal256, s}
×
154
    end
155
  end
156

157
  defp encoding_type({d, _scale} = t)
158
       when d in [:decimal32, :decimal64, :decimal128, :decimal256],
159
       do: t
6✔
160

161
  defp encoding_type({:datetime64 = t, p}), do: {t, time_unit(p)}
2✔
162

163
  defp encoding_type({:datetime64 = t, p, "UTC"}), do: {t, time_unit(p)}
×
164

165
  defp encoding_type({:datetime64, _, tz}) do
UNCOV
166
    raise ArgumentError, "can't encode DateTime64 with non-UTC timezone: #{inspect(tz)}"
×
167
  end
168

169
  defp encoding_type({:time64 = t, p}), do: {t, time_unit(p)}
8✔
170

171
  defp encoding_type({e, mappings}) when e in [:enum8, :enum16] do
4✔
172
    {e, Map.new(mappings)}
173
  end
174

UNCOV
175
  defp encoding_type({:simple_aggregate_function, _f, t}), do: encoding_type(t)
×
176

177
  defp encoding_type(:ring), do: {:array, :point}
2✔
178
  defp encoding_type(:polygon), do: {:array, {:array, :point}}
2✔
179
  defp encoding_type(:multipolygon), do: {:array, {:array, {:array, :point}}}
2✔
180
  defp encoding_type({:json, opts}), do: {:json, opts}
2✔
181

182
  defp encoding_type(type) do
UNCOV
183
    raise ArgumentError, "unsupported type for encoding: #{inspect(type)}"
×
184
  end
185

186
  @doc false
187
  def encode(type, value)
188

189
  def encode(:varint, i) when is_integer(i) and i < 128, do: i
393✔
190
  def encode(:varint, i) when is_integer(i), do: encode_varint_cont(i)
8✔
191

192
  def encode(type, str) when type in [:string, :binary] do
193
    case str do
333✔
194
      _ when is_binary(str) -> [encode(:varint, byte_size(str)) | str]
277✔
195
      _ when is_list(str) -> [encode(:varint, IO.iodata_length(str)) | str]
52✔
196
      nil -> 0
1✔
197
    end
198
  end
199

200
  def encode(:json, json) do
201
    # assuming it can be sent as text and not "native" binary JSON
202
    # i.e. assumes `settings: [input_format_binary_read_json_as_string: 1]`
203
    # TODO
204
    encode(:string, Jason.encode_to_iodata!(json))
8✔
205
  end
206

207
  def encode({:json, _opts}, json) do
208
    # assuming it can be sent as text and not "native" binary JSON
209
    # i.e. assumes `settings: [input_format_binary_read_json_as_string: 1]`
210
    # TODO
211
    encode(:string, Jason.encode_to_iodata!(json))
2✔
212
  end
213

214
  def encode({:fixed_string, size}, str) when byte_size(str) == size do
215
    str
15✔
216
  end
217

218
  def encode({:fixed_string, size}, str) when byte_size(str) < size do
219
    to_pad = size - byte_size(str)
6✔
220
    [str | <<0::size(to_pad * 8)>>]
221
  end
222

223
  def encode({:fixed_string, size}, nil), do: <<0::size(size * 8)>>
1✔
224

225
  # UInt8 — [0 : 255]
226
  def encode(:u8, u) when is_integer(u) and u >= 0 and u <= 255, do: u
111✔
227
  def encode(:u8, nil), do: 0
5✔
228

229
  def encode(:u8, term) do
230
    raise ArgumentError, "invalid UInt8: #{inspect(term)}"
3✔
231
  end
232

233
  # Int8 — [-128 : 127]
234
  def encode(:i8, i) when is_integer(i) and i >= 0 and i <= 127, do: i
15✔
235
  def encode(:i8, i) when is_integer(i) and i < 0 and i >= -128, do: <<i::signed>>
3✔
236
  def encode(:i8, nil), do: 0
1✔
237

238
  def encode(:i8, term) do
239
    raise ArgumentError, "invalid Int8: #{inspect(term)}"
3✔
240
  end
241

242
  for size <- [16, 32, 64, 128, 256] do
243
    def encode(unquote(:"u#{size}"), u) when is_integer(u) do
244
      <<u::unquote(size)-little>>
2,000,066✔
245
    end
246

247
    def encode(unquote(:"i#{size}"), i) when is_integer(i) do
248
      <<i::unquote(size)-little-signed>>
56✔
249
    end
250

251
    def encode(unquote(:"u#{size}"), nil), do: <<0::unquote(size)>>
3✔
252
    def encode(unquote(:"i#{size}"), nil), do: <<0::unquote(size)>>
3✔
253
  end
254

255
  for size <- [32, 64] do
256
    type = :"f#{size}"
257

258
    def encode(unquote(type), f) when is_number(f) do
259
      <<f::unquote(size)-little-signed-float>>
71✔
260
    end
261

262
    def encode(unquote(type), nil), do: <<0::unquote(size)>>
2✔
263
  end
264

265
  def encode({:decimal, precision, scale}, decimal) do
266
    type =
×
267
      case decimal_size(precision) do
268
        32 -> :decimal32
×
UNCOV
269
        64 -> :decimal64
×
UNCOV
270
        128 -> :decimal128
×
271
        256 -> :decimal256
×
272
      end
273

UNCOV
274
    encode({type, scale}, decimal)
×
275
  end
276

277
  for size <- [32, 64, 128, 256] do
278
    type = :"decimal#{size}"
279

280
    def encode({unquote(type), scale} = t, %Decimal{sign: sign, coef: coef, exp: exp} = d) do
281
      cond do
24✔
282
        scale == -exp ->
283
          i = sign * coef
17✔
284
          <<i::unquote(size)-little>>
17✔
285

286
        exp >= 0 ->
7✔
287
          i = sign * coef * round(:math.pow(10, exp + scale))
1✔
288
          <<i::unquote(size)-little>>
1✔
289

290
        true ->
6✔
291
          encode(t, Decimal.round(d, scale))
6✔
292
      end
293
    end
294

295
    def encode({unquote(type), _scale}, nil), do: <<0::unquote(size)>>
4✔
296
  end
297

298
  def encode(:boolean, true), do: 1
4✔
299
  def encode(:boolean, false), do: 0
4✔
300
  def encode(:boolean, nil), do: 0
1✔
301

302
  def encode({:array, type}, [_ | _] = l) do
49✔
303
    [encode(:varint, length(l)) | encode_many(l, type)]
304
  end
305

306
  def encode({:array, _type}, []), do: 0
13✔
307
  def encode({:array, _type}, nil), do: 0
4✔
308

309
  def encode({:map, k, v}, [_ | _] = m) do
14✔
310
    [encode(:varint, length(m)) | encode_many_kv(m, k, v)]
311
  end
312

313
  def encode({:map, _k, _v} = t, m) when is_map(m), do: encode(t, Map.to_list(m))
16✔
314
  def encode({:map, _k, _v}, []), do: 0
8✔
315
  def encode({:map, _k, _v}, nil), do: 0
3✔
316

317
  def encode({:tuple, _types} = t, v) when is_tuple(v) do
318
    encode(t, Tuple.to_list(v))
8✔
319
  end
320

321
  def encode({:tuple, types}, values) when is_list(types) and is_list(values) do
322
    encode_row(values, types)
8✔
323
  end
324

325
  def encode({:tuple, types}, nil) when is_list(types) do
UNCOV
326
    Enum.map(types, fn type -> encode(type, nil) end)
×
327
  end
328

329
  def encode({:variant, _types}, nil), do: 255
4✔
330

331
  def encode({:variant, types}, value) do
332
    try_encode_variant(types, 0, value)
10✔
333
  end
334

335
  def encode(:datetime, %NaiveDateTime{} = datetime) do
336
    {seconds, _micros} = NaiveDateTime.to_gregorian_seconds(datetime)
18✔
337
    # TODO raise if negative?
338
    epoch_seconds = seconds - @epoch_seconds_since_gregorian
18✔
339
    <<epoch_seconds::32-little>>
18✔
340
  end
341

342
  def encode(:datetime, %DateTime{time_zone: "Etc/UTC"} = datetime) do
343
    <<DateTime.to_unix(datetime, :second)::32-little>>
10✔
344
  end
345

346
  def encode(:datetime, %DateTime{} = datetime) do
347
    encode(:datetime, DateTime.shift_zone!(datetime, "Etc/UTC"))
8✔
348
  end
349

350
  def encode(:datetime, nil), do: <<0::32>>
1✔
351

352
  def encode({:datetime64, time_unit}, %NaiveDateTime{} = datetime) do
353
    <<NaiveDateTime.diff(datetime, @epoch_naive_datetime, time_unit)::64-little-signed>>
8✔
354
  end
355

356
  def encode({:datetime64, time_unit}, %DateTime{time_zone: "Etc/UTC"} = datetime) do
357
    <<DateTime.diff(datetime, @epoch_utc_datetime, time_unit)::64-little-signed>>
4✔
358
  end
359

360
  def encode({:datetime64, _time_unit}, %DateTime{} = datetime) do
UNCOV
361
    raise ArgumentError, "non-UTC timezones are not supported for encoding: #{datetime}"
×
362
  end
363

364
  def encode({:datetime64, _time_unit}, nil), do: <<0::64>>
1✔
365

366
  def encode(:date, %Date{} = date) do
367
    <<Date.diff(date, @epoch_date)::16-little>>
13✔
368
  end
369

370
  def encode(:date, nil), do: <<0::16>>
1✔
371

372
  def encode(:date32, %Date{} = date) do
373
    <<Date.diff(date, @epoch_date)::32-little-signed>>
8✔
374
  end
375

376
  def encode(:date32, nil), do: <<0::32>>
1✔
377

378
  def encode(:time, %Time{} = time) do
379
    {s, _micros} = Time.to_seconds_after_midnight(time)
10✔
380
    <<s::32-little-signed>>
10✔
381
  end
382

UNCOV
383
  def encode(:time, nil), do: <<0::32>>
×
384

385
  def encode({:time64, time_unit}, %Time{} = time) do
386
    {s, micros} = Time.to_seconds_after_midnight(time)
34✔
387

388
    micros_as_ticks =
34✔
389
      cond do
390
        time_unit < 1_000_000 -> div(micros, time_unit)
12✔
391
        time_unit == 1_000_000 -> micros
22✔
392
        true -> micros * div(time_unit, 1_000_000)
10✔
393
      end
394

395
    ticks = s * time_unit + micros_as_ticks
34✔
396
    <<ticks::64-little-signed>>
34✔
397
  end
398

UNCOV
399
  def encode({:time64, _time_unit}, nil), do: <<0::64>>
×
400

401
  def encode(:uuid, <<u1::64, u2::64>>), do: <<u1::64-little, u2::64-little>>
9✔
402

403
  def encode(
404
        :uuid,
405
        <<a1, a2, a3, a4, a5, a6, a7, a8, ?-, b1, b2, b3, b4, ?-, c1, c2, c3, c4, ?-, d1, d2, d3,
406
          d4, ?-, e1, e2, e3, e4, e5, e6, e7, e8, e9, e10, e11, e12>>
407
      ) do
408
    raw =
1✔
409
      <<d(a1)::4, d(a2)::4, d(a3)::4, d(a4)::4, d(a5)::4, d(a6)::4, d(a7)::4, d(a8)::4, d(b1)::4,
410
        d(b2)::4, d(b3)::4, d(b4)::4, d(c1)::4, d(c2)::4, d(c3)::4, d(c4)::4, d(d1)::4, d(d2)::4,
411
        d(d3)::4, d(d4)::4, d(e1)::4, d(e2)::4, d(e3)::4, d(e4)::4, d(e5)::4, d(e6)::4, d(e7)::4,
412
        d(e8)::4, d(e9)::4, d(e10)::4, d(e11)::4, d(e12)::4>>
413

414
    encode(:uuid, raw)
1✔
415
  end
416

417
  def encode(:uuid, nil), do: <<0::128>>
1✔
418

419
  def encode(:ipv4, {a, b, c, d}), do: [d, c, b, a]
4✔
UNCOV
420
  def encode(:ipv4, nil), do: <<0::32>>
×
421

422
  def encode(:ipv6, {b1, b2, b3, b4, b5, b6, b7, b8}) do
423
    <<b1::16, b2::16, b3::16, b4::16, b5::16, b6::16, b7::16, b8::16>>
4✔
424
  end
425

UNCOV
426
  def encode(:ipv6, <<_::128>> = encoded), do: encoded
×
UNCOV
427
  def encode(:ipv6, nil), do: <<0::128>>
×
428

429
  def encode(:point, {x, y}), do: [encode(:f64, x) | encode(:f64, y)]
28✔
430
  def encode(:point, nil), do: <<0::128>>
1✔
431
  def encode(:ring, points), do: encode({:array, :point}, points)
1✔
432
  def encode(:polygon, rings), do: encode({:array, :ring}, rings)
1✔
433
  def encode(:multipolygon, polygons), do: encode({:array, :polygon}, polygons)
1✔
434

435
  # TODO
436
  def encode(:dynamic, value) do
437
    case value do
20✔
438
      _ when is_binary(value) -> [0x15 | encode(:string, value)]
3✔
439
      _ when is_integer(value) and value >= 0 -> [0x04 | encode(:u64, value)]
5✔
440
      _ when is_integer(value) -> [0x0A | encode(:i64, value)]
2✔
441
      _ when is_float(value) -> [0x0E | encode(:f64, value)]
3✔
442
      %Date{} -> [0x0F | encode(:date, value)]
3✔
443
      %DateTime{} -> [0x11 | encode(:datetime, value)]
×
444
      %NaiveDateTime{} -> [0x11 | encode(:datetime, value)]
2✔
445
      %{} -> [0x30, 0x00, 0x80, 0x08, 0x20, 0x00, 0x00, 0x00 | encode(:json, value)]
2✔
UNCOV
446
      [] -> [0x1E, 0x00]
×
447
    end
448
  end
449

450
  # TODO enum8 and enum16 nil
451
  for size <- [8, 16] do
452
    enum_t = :"enum#{size}"
453
    int_t = :"i#{size}"
454

455
    def encode({unquote(enum_t), mapping}, e) do
456
      i =
12✔
457
        case e do
458
          _ when is_integer(e) ->
459
            e
4✔
460

461
          _ when is_binary(e) ->
462
            case Map.fetch(mapping, e) do
8✔
463
              {:ok, res} ->
464
                res
8✔
465

466
              :error ->
UNCOV
467
                raise ArgumentError,
×
468
                      "enum value #{inspect(e)} not found in mapping: #{inspect(mapping)}"
469
            end
470
        end
471

472
      encode(unquote(int_t), i)
12✔
473
    end
474
  end
475

476
  def encode({:nullable, _type}, nil), do: 1
12✔
477

478
  def encode({:nullable, type}, value) do
479
    case encode(type, value) do
11✔
480
      e when is_list(e) or is_binary(e) -> [0 | e]
11✔
UNCOV
481
      e -> [0, e]
×
482
    end
483
  end
484

485
  defp encode_varint_cont(i) when i < 128, do: <<i>>
8✔
486

487
  defp encode_varint_cont(i) do
12✔
488
    [(i &&& 0b0111_1111) ||| 0b1000_0000 | encode_varint_cont(i >>> 7)]
489
  end
490

491
  defp encode_many([el | rest], type), do: [encode(type, el) | encode_many(rest, type)]
129✔
492
  defp encode_many([] = done, _type), do: done
57✔
493

494
  defp encode_many_kv([{key, value} | rest], key_type, value_type) do
19✔
495
    [
496
      encode(key_type, key),
497
      encode(value_type, value)
498
      | encode_many_kv(rest, key_type, value_type)
499
    ]
500
  end
501

502
  defp encode_many_kv([] = done, _key_type, _value_type), do: done
14✔
503

504
  # TODO find a better way than try/rescue
505
  defp try_encode_variant([type | types], idx, value) do
506
    try do
18✔
507
      encode(type, value)
18✔
508
    else
509
      encoded -> [idx | encoded]
10✔
510
    rescue
511
      _e -> try_encode_variant(types, idx + 1, value)
8✔
512
    end
513
  end
514

515
  defp try_encode_variant([], _idx, value) do
UNCOV
516
    raise ArgumentError, "no matching type found for encoding #{inspect(value)} as Variant"
×
517
  end
518

519
  @compile {:inline, d: 1}
520

521
  defp d(?0), do: 0
1✔
522
  defp d(?1), do: 1
×
523
  defp d(?2), do: 2
1✔
524
  defp d(?3), do: 3
1✔
525
  defp d(?4), do: 4
×
526
  defp d(?5), do: 5
1✔
527
  defp d(?6), do: 6
1✔
528
  defp d(?7), do: 7
×
529
  defp d(?8), do: 8
1✔
530
  defp d(?9), do: 9
1✔
531
  defp d(?A), do: 10
×
532
  defp d(?B), do: 11
×
533
  defp d(?C), do: 12
×
UNCOV
534
  defp d(?D), do: 13
×
UNCOV
535
  defp d(?E), do: 14
×
UNCOV
536
  defp d(?F), do: 15
×
537
  defp d(?a), do: 10
1✔
538
  defp d(?b), do: 11
1✔
539
  defp d(?c), do: 12
1✔
540
  defp d(?d), do: 13
1✔
541
  defp d(?e), do: 14
1✔
542
  defp d(?f), do: 15
1✔
543

544
  varints = [
545
    {_pattern = quote(do: <<0::1, v1::7>>), _value = quote(do: v1)},
546
    {quote(do: <<1::1, v1::7, 0::1, v2::7>>), quote(do: (v2 <<< 7) + v1)},
547
    {quote(do: <<1::1, v1::7, 1::1, v2::7, 0::1, v3::7>>),
548
     quote(do: (v3 <<< 14) + (v2 <<< 7) + v1)},
549
    {quote(do: <<1::1, v1::7, 1::1, v2::7, 1::1, v3::7, 0::1, v4::7>>),
550
     quote(do: (v4 <<< 21) + (v3 <<< 14) + (v2 <<< 7) + v1)},
551
    {quote(do: <<1::1, v1::7, 1::1, v2::7, 1::1, v3::7, 1::1, v4::7, 0::1, v5::7>>),
552
     quote(do: (v5 <<< 28) + (v4 <<< 21) + (v3 <<< 14) + (v2 <<< 7) + v1)},
553
    {quote(do: <<1::1, v1::7, 1::1, v2::7, 1::1, v3::7, 1::1, v4::7, 1::1, v5::7, 0::1, v6::7>>),
554
     quote(do: (v6 <<< 35) + (v5 <<< 28) + (v4 <<< 21) + (v3 <<< 14) + (v2 <<< 7) + v1)},
555
    {quote do
556
       <<1::1, v1::7, 1::1, v2::7, 1::1, v3::7, 1::1, v4::7, 1::1, v5::7, 1::1, v6::7, 0::1,
557
         v7::7>>
558
     end,
559
     quote do
560
       (v7 <<< 42) + (v6 <<< 35) + (v5 <<< 28) + (v4 <<< 21) + (v3 <<< 14) + (v2 <<< 7) + v1
561
     end},
562
    {quote do
563
       <<1::1, v1::7, 1::1, v2::7, 1::1, v3::7, 1::1, v4::7, 1::1, v5::7, 1::1, v6::7, 1::1,
564
         v7::7, 0::1, v8::7>>
565
     end,
566
     quote do
567
       (v8 <<< 49) + (v7 <<< 42) + (v6 <<< 35) + (v5 <<< 28) + (v4 <<< 21) + (v3 <<< 14) +
568
         (v2 <<< 7) + v1
569
     end}
570
  ]
571

572
  @doc false
573
  @spec decode_header(binary()) ::
574
          {:ok, names :: [String.t()], types :: [term], rest :: binary} | :more
575
  def decode_header(row_binary_with_names_and_types)
576

577
  for {pattern, value} <- varints do
578
    def decode_header(<<unquote(pattern), rest::bytes>>) do
579
      decode_header_names(rest, unquote(value), unquote(value), _acc = [])
42✔
580
    end
581
  end
582

583
  def decode_header(<<_bin::bytes>>) do
1✔
584
    :more
585
  end
586

587
  defp decode_header_names(<<rest::bytes>>, 0, count, names) do
588
    decode_header_types(rest, count, _acc = [], :lists.reverse(names))
27✔
589
  end
590

591
  for {pattern, value} <- varints do
592
    defp decode_header_names(
593
           <<unquote(pattern), name::size(unquote(value))-bytes, rest::bytes>>,
594
           left,
595
           count,
596
           acc
597
         ) do
598
      decode_header_names(rest, left - 1, count, [name | acc])
84✔
599
    end
600
  end
601

602
  defp decode_header_names(<<_bin::bytes>>, _left, _count, _acc) do
15✔
603
    :more
604
  end
605

606
  defp decode_header_types(<<rest::bytes>>, 0, types, names) do
607
    {:ok, names, decoding_types_reverse(types), rest}
7✔
608
  end
609

610
  for {pattern, value} <- varints do
611
    defp decode_header_types(
612
           <<unquote(pattern), type::size(unquote(value))-bytes, rest::bytes>>,
613
           count,
614
           acc,
615
           names
616
         ) do
617
      decode_header_types(rest, count - 1, [type | acc], names)
30✔
618
    end
619
  end
620

621
  defp decode_header_types(<<_bin::bytes>>, _count, _acc, _names) do
20✔
622
    :more
623
  end
624

625
  @doc """
626
  Decodes [RowBinaryWithNamesAndTypes](https://clickhouse.com/docs/en/interfaces/formats/RowBinaryWithNamesAndTypes) into rows.
627

628
  Example:
629

630
      iex> decode_rows(<<1, 3, "1+1"::bytes, 5, "UInt8"::bytes, 2>>)
631
      [[2]]
632

633
  """
634
  def decode_rows(row_binary_with_names_and_types)
635
  def decode_rows(<<>>), do: []
1✔
636

637
  for {pattern, value} <- varints do
638
    def decode_rows(<<unquote(pattern), rest::bytes>>) do
639
      skip_names(rest, unquote(value), unquote(value))
7✔
640
    end
641
  end
642

643
  @doc """
644
  Same as `decode_rows/1` but the first element is a list of column names.
645

646
  Example:
647

648
      iex> decode_names_and_rows(<<1, 3, "1+1"::bytes, 5, "UInt8"::bytes, 2>>)
649
      [["1+1"], [2]]
650

651
  """
652
  def decode_names_and_rows(row_binary_with_names_and_types)
653

654
  for {pattern, value} <- varints do
655
    def decode_names_and_rows(<<unquote(pattern), rest::bytes>>) do
656
      decode_names(rest, unquote(value), unquote(value), _acc = [])
1,454✔
657
    end
658
  end
659

660
  @doc """
661
  Decodes [RowBinary](https://clickhouse.com/docs/en/interfaces/formats/RowBinary) into rows.
662

663
  Example:
664

665
      iex> decode_rows(<<1>>, ["UInt8"])
666
      [[1]]
667

668
  """
669
  def decode_rows(row_binary, types)
670
  def decode_rows(<<>>, _types), do: []
1✔
671

672
  def decode_rows(<<data::bytes>>, types) do
673
    decode_rows!(data, decoding_types(types))
13✔
674
  end
675

676
  defp decode_rows!(data, types) do
677
    {rows, remaining_data, state} = decode_rows(types, data, [], [], types)
1,323✔
678

679
    case state do
1,289✔
680
      nil ->
681
        rows
1,287✔
682

683
      {:cont, types_rest, row} ->
684
        raise ArgumentError, """
2✔
685
        incomplete RowBinary data: ran out of bytes while decoding
686

687
        Expected to decode: #{inspect(types_rest)}
688
        Remaining bytes: #{byte_size(remaining_data)} bytes
2✔
689
        Partial row: #{inspect(row)}
690
        Completed rows: #{length(rows)}
2✔
691
        """
692
    end
693
  end
694

695
  @doc false
696
  def decode_rows_continue(<<data::bytes>>, types, state) do
697
    case state do
201,166✔
698
      {:cont, types_rest, row} -> decode_rows(types_rest, data, row, [], types)
201,074✔
699
      nil -> decode_rows(types, data, [], [], types)
92✔
700
    end
701
  end
702

703
  @doc false
704
  def decoding_types([type | types]) do
129✔
705
    [decoding_type(type) | decoding_types(types)]
706
  end
707

708
  def decoding_types([] = done), do: done
85✔
709

710
  defp decoding_types_reverse(types), do: decoding_types_reverse(types, [])
1,317✔
711

712
  defp decoding_types_reverse([type | types], acc) do
713
    decoding_types_reverse(types, [decoding_type(type) | acc])
4,174✔
714
  end
715

716
  defp decoding_types_reverse([], acc), do: acc
1,317✔
717

718
  defp decoding_type(t) when is_binary(t) do
719
    decoding_type(Ch.Types.decode(t))
4,289✔
720
  end
721

722
  defp decoding_type(t)
723
       when t in [
724
              :string,
725
              :binary,
726
              :json,
727
              :dynamic,
728
              :boolean,
729
              :uuid,
730
              :date,
731
              :date32,
732
              :time,
733
              :time64,
734
              :ipv4,
735
              :ipv6,
736
              :point,
737
              :nothing
738
            ],
739
       do: t
1,245✔
740

741
  defp decoding_type({:datetime, _tz} = t), do: t
32✔
742
  defp decoding_type({:fixed_string, _len} = t), do: t
22✔
743

744
  for size <- [8, 16, 32, 64, 128, 256] do
745
    defp decoding_type(unquote(:"u#{size}") = u), do: u
2,670✔
746
    defp decoding_type(unquote(:"i#{size}") = i), do: i
98✔
747
  end
748

749
  for size <- [32, 64] do
750
    defp decoding_type(unquote(:"f#{size}") = f), do: f
44✔
751
  end
752

753
  defp decoding_type(:datetime = t), do: {t, _tz = nil}
21✔
754

755
  defp decoding_type({:array = a, t}), do: {a, decoding_type(t)}
124✔
756

757
  defp decoding_type({:tuple = t, ts}) do
18✔
758
    {t, Enum.map(ts, &decoding_type/1)}
759
  end
760

761
  defp decoding_type({:variant = v, ts}) do
28✔
762
    {v, Enum.map(ts, &decoding_type/1)}
763
  end
764

765
  defp decoding_type({:map = m, kt, vt}) do
766
    {m, decoding_type(kt), decoding_type(vt)}
25✔
767
  end
768

769
  defp decoding_type({:nullable = n, t}), do: {n, decoding_type(t)}
58✔
770
  defp decoding_type({:low_cardinality, t}), do: decoding_type(t)
147✔
771

772
  defp decoding_type({:decimal = t, p, s}), do: {t, decimal_size(p), s}
62✔
773
  defp decoding_type({:decimal32, s}), do: {:decimal, 32, s}
1✔
774
  defp decoding_type({:decimal64, s}), do: {:decimal, 64, s}
1✔
775
  defp decoding_type({:decimal128, s}), do: {:decimal, 128, s}
1✔
776
  defp decoding_type({:decimal256, s}), do: {:decimal, 256, s}
1✔
777

778
  defp decoding_type({:datetime64 = t, p}), do: {t, time_unit(p), _tz = nil}
33✔
779
  defp decoding_type({:datetime64 = t, p, tz}), do: {t, time_unit(p), tz}
20✔
780

781
  defp decoding_type({:time64 = t, p}), do: {t, time_unit(p)}
120✔
782

783
  defp decoding_type({e, mappings}) when e in [:enum8, :enum16] do
17✔
784
    {e, Map.new(mappings, fn {k, v} -> {v, k} end)}
34✔
785
  end
786

787
  defp decoding_type({:simple_aggregate_function, _f, t}), do: decoding_type(t)
6✔
788

789
  defp decoding_type(:ring), do: {:array, :point}
6✔
790
  defp decoding_type(:polygon), do: {:array, {:array, :point}}
6✔
791
  defp decoding_type(:multipolygon), do: {:array, {:array, {:array, :point}}}
6✔
792
  defp decoding_type({:json, _opts}), do: :json
6✔
793

794
  defp decoding_type(type) do
UNCOV
795
    raise ArgumentError, "unsupported type for decoding: #{inspect(type)}"
×
796
  end
797

798
  defp skip_names(<<rest::bytes>>, 0, count), do: decode_types(rest, count, _acc = [])
7✔
799

800
  for {pattern, value} <- varints do
801
    defp skip_names(<<unquote(pattern), _::size(unquote(value))-bytes, rest::bytes>>, left, count) do
802
      skip_names(rest, left - 1, count)
77✔
803
    end
804
  end
805

806
  defp decode_names(<<rest::bytes>>, 0, count, names) do
1,454✔
807
    [:lists.reverse(names) | decode_types(rest, count, _acc = [])]
808
  end
809

810
  for {pattern, value} <- varints do
811
    defp decode_names(
812
           <<unquote(pattern), name::size(unquote(value))-bytes, rest::bytes>>,
813
           left,
814
           count,
815
           acc
816
         ) do
817
      decode_names(rest, left - 1, count, [name | acc])
4,242✔
818
    end
819
  end
820

821
  defp decode_types(<<>>, 0, _types), do: []
151✔
822

823
  defp decode_types(<<rest::bytes>>, 0, types) do
824
    decode_rows!(rest, decoding_types_reverse(types))
1,310✔
825
  end
826

827
  for {pattern, value} <- varints do
828
    defp decode_types(
829
           <<unquote(pattern), type::size(unquote(value))-bytes, rest::bytes>>,
830
           count,
831
           acc
832
         ) do
833
      decode_types(rest, count - 1, [type | acc])
4,319✔
834
    end
835
  end
836

837
  @compile inline: [decode_string_decode_rows: 5]
838

839
  for {pattern, size} <- varints do
840
    defp decode_string_decode_rows(
841
           <<unquote(pattern), s::size(unquote(size))-bytes, bin::bytes>>,
842
           types_rest,
843
           row,
844
           rows,
845
           types
846
         ) do
847
      decode_rows(types_rest, bin, [to_utf8(s) | row], rows, types)
1,129✔
848
    end
849
  end
850

851
  defp decode_string_decode_rows(<<bin::bytes>>, types_rest, row, rows, _types) do
852
    to_be_continued(rows, bin, [:string | types_rest], row)
100,165✔
853
  end
854

855
  @doc false
856
  def to_utf8(str) do
857
    utf8 = to_utf8(str, 0, 0, str, [])
1,129✔
858
    IO.iodata_to_binary(utf8)
1,129✔
859
  end
860

861
  @dialyzer {:no_improper_lists, to_utf8: 5, to_utf8_escape: 5}
862

863
  defp to_utf8(<<valid::utf8, rest::bytes>>, from, len, original, acc) do
864
    to_utf8(rest, from, len + utf8_size(valid), original, acc)
30,139,399✔
865
  end
866

867
  defp to_utf8(<<_invalid, rest::bytes>>, from, len, original, acc) do
868
    acc = [acc | binary_part(original, from, len)]
6✔
869
    to_utf8_escape(rest, from + len, 1, original, acc)
6✔
870
  end
871

872
  defp to_utf8(<<>>, from, len, original, acc) do
1,128✔
873
    [acc | binary_part(original, from, len)]
874
  end
875

876
  defp to_utf8_escape(<<valid::utf8, rest::bytes>>, from, len, original, acc) do
877
    acc = [acc | "�"]
5✔
878
    to_utf8(rest, from + len, utf8_size(valid), original, acc)
5✔
879
  end
880

881
  defp to_utf8_escape(<<_invalid, rest::bytes>>, from, len, original, acc) do
882
    to_utf8_escape(rest, from, len + 1, original, acc)
7✔
883
  end
884

885
  defp to_utf8_escape(<<>>, _from, _len, _original, acc) do
1✔
886
    [acc | "�"]
887
  end
888

889
  # UTF-8 encodes code points in one to four bytes
890
  @compile inline: [utf8_size: 1]
891
  defp utf8_size(codepoint) when codepoint <= 0x7F, do: 1
30,139,368✔
892
  defp utf8_size(codepoint) when codepoint <= 0x7FF, do: 2
13✔
893
  defp utf8_size(codepoint) when codepoint <= 0xFFFF, do: 3
23✔
UNCOV
894
  defp utf8_size(codepoint) when codepoint <= 0x10FFFF, do: 4
×
895

896
  @compile inline: [decode_string_json_decode_rows: 5]
897

898
  for {pattern, size} <- varints do
899
    defp decode_string_json_decode_rows(
900
           <<unquote(pattern), s::size(unquote(size))-bytes, bin::bytes>>,
901
           types_rest,
902
           row,
903
           rows,
904
           types
905
         ) do
906
      decode_rows(types_rest, bin, [Jason.decode!(s) | row], rows, types)
98✔
907
    end
908
  end
909

910
  defp decode_string_json_decode_rows(<<bin::bytes>>, types_rest, row, rows, _types) do
911
    to_be_continued(rows, bin, [:json | types_rest], row)
46✔
912
  end
913

914
  @compile inline: [decode_binary_decode_rows: 5]
915

916
  for {pattern, size} <- varints do
917
    defp decode_binary_decode_rows(
918
           <<unquote(pattern), s::size(unquote(size))-bytes, bin::bytes>>,
919
           types_rest,
920
           row,
921
           rows,
922
           types
923
         ) do
924
      decode_rows(types_rest, bin, [s | row], rows, types)
4✔
925
    end
926
  end
927

928
  defp decode_binary_decode_rows(<<bin::bytes>>, types_rest, row, rows, _types) do
929
    to_be_continued(rows, bin, [:binary | types_rest], row)
100,007✔
930
  end
931

932
  @compile inline: [decode_array_decode_rows: 6]
933
  defp decode_array_decode_rows(<<0, bin::bytes>>, _type, types_rest, row, rows, types) do
934
    decode_rows(types_rest, bin, [[] | row], rows, types)
85✔
935
  end
936

937
  for {pattern, size} <- varints do
938
    defp decode_array_decode_rows(
939
           <<unquote(pattern), bin::bytes>>,
940
           type,
941
           types_rest,
942
           row,
943
           rows,
944
           types
945
         ) do
946
      array_types = List.duplicate(type, unquote(size))
285✔
947
      types_rest = array_types ++ [{:array_over, row} | types_rest]
285✔
948
      decode_rows(types_rest, bin, [], rows, types)
285✔
949
    end
950
  end
951

952
  defp decode_array_decode_rows(<<bin::bytes>>, type, types_rest, row, rows, _types) do
953
    to_be_continued(rows, bin, [{:array, type} | types_rest], row)
12✔
954
  end
955

956
  @compile inline: [decode_map_decode_rows: 7]
957
  defp decode_map_decode_rows(
958
         <<0, bin::bytes>>,
959
         _key_type,
960
         _value_type,
961
         types_rest,
962
         row,
963
         rows,
964
         types
965
       ) do
966
    decode_rows(types_rest, bin, [%{} | row], rows, types)
10✔
967
  end
968

969
  for {pattern, size} <- varints do
970
    defp decode_map_decode_rows(
971
           <<unquote(pattern), bin::bytes>>,
972
           key_type,
973
           value_type,
974
           types_rest,
975
           row,
976
           rows,
977
           types
978
         ) do
979
      types_rest =
32✔
980
        map_types(unquote(size), key_type, value_type) ++ [{:map_over, row} | types_rest]
981

982
      decode_rows(types_rest, bin, [], rows, types)
32✔
983
    end
984
  end
985

986
  defp decode_map_decode_rows(<<bin::bytes>>, key_type, value_type, types_rest, row, rows, _types) do
987
    to_be_continued(rows, bin, [{:map, key_type, value_type} | types_rest], row)
6✔
988
  end
989

990
  defp map_types(count, key_type, value_type) when count > 0 do
57✔
991
    [key_type, value_type | map_types(count - 1, key_type, value_type)]
992
  end
993

994
  defp map_types(0, _key_type, _value_types), do: []
32✔
995

996
  # https://clickhouse.com/docs/sql-reference/data-types/data-types-binary-encoding
997
  dynamic_types = [
998
    nothing: 0x00,
999
    u8: 0x01,
1000
    u16: 0x02,
1001
    u32: 0x03,
1002
    u64: 0x04,
1003
    u128: 0x05,
1004
    u256: 0x06,
1005
    i8: 0x07,
1006
    i16: 0x08,
1007
    i32: 0x09,
1008
    i64: 0x0A,
1009
    i128: 0x0B,
1010
    i256: 0x0C,
1011
    f32: 0x0D,
1012
    f64: 0x0E,
1013
    date: 0x0F,
1014
    date32: 0x10,
1015
    string: 0x15,
1016
    uuid: 0x1D,
1017
    ipv4: 0x28,
1018
    ipv6: 0x29,
1019
    boolean: 0x2D
1020
  ]
1021

1022
  # TODO compile inline?
1023

1024
  for {type, code} <- dynamic_types do
1025
    defp decode_dynamic(
1026
           <<unquote(code), rest::bytes>>,
1027
           dynamic,
1028
           types_rest,
1029
           row,
1030
           rows,
1031
           types
1032
         ) do
1033
      decode_dynamic_continue(rest, [unquote(type) | dynamic], types_rest, row, rows, types)
254✔
1034
    end
1035
  end
1036

1037
  # DateTime 0x11
1038
  defp decode_dynamic(<<0x11, rest::bytes>>, dynamic, types_rest, row, rows, types) do
1039
    decode_dynamic_continue(rest, [{:datetime, nil} | dynamic], types_rest, row, rows, types)
4✔
1040
  end
1041

1042
  # DateTime(time_zone) 0x12 <var_uint_time_zone_name_size><time_zone_name_data>
1043
  for {pattern, size} <- varints do
1044
    defp decode_dynamic(
1045
           <<0x12, unquote(pattern), tz::size(unquote(size))-bytes, rest::bytes>>,
1046
           dynamic,
1047
           types_rest,
1048
           row,
1049
           rows,
1050
           types
1051
         ) do
1052
      decode_dynamic_continue(rest, [{:datetime, tz} | dynamic], types_rest, row, rows, types)
2✔
1053
    end
1054
  end
1055

1056
  # DateTime64(P) 0x13 <uint8_precision>
1057
  defp decode_dynamic(
1058
         <<0x13, precision, rest::bytes>>,
1059
         dynamic,
1060
         types_rest,
1061
         row,
1062
         rows,
1063
         types
1064
       ) do
1065
    decode_dynamic_continue(
2✔
1066
      rest,
1067
      [decoding_type({:datetime64, precision}) | dynamic],
1068
      types_rest,
1069
      row,
1070
      rows,
1071
      types
1072
    )
1073
  end
1074

1075
  # DateTime64(P, time_zone) 0x14 <uint8_precision><var_uint_time_zone_name_size><time_zone_name_data>
1076
  for {pattern, size} <- varints do
1077
    defp decode_dynamic(
1078
           <<0x14, precision, unquote(pattern), tz::size(unquote(size))-bytes, rest::bytes>>,
1079
           dynamic,
1080
           types_rest,
1081
           row,
1082
           rows,
1083
           types
1084
         ) do
1085
      decode_dynamic_continue(
2✔
1086
        rest,
1087
        [decoding_type({:datetime64, precision, tz}) | dynamic],
1088
        types_rest,
1089
        row,
1090
        rows,
1091
        types
1092
      )
1093
    end
1094
  end
1095

1096
  # FixedString(N) 0x16 <var_uint_size>
1097
  for {pattern, size} <- varints do
1098
    defp decode_dynamic(
1099
           <<0x16, unquote(pattern), rest::bytes>>,
1100
           dynamic,
1101
           types_rest,
1102
           row,
1103
           rows,
1104
           types
1105
         ) do
1106
      decode_dynamic_continue(
4✔
1107
        rest,
1108
        [{:fixed_string, unquote(size)} | dynamic],
1109
        types_rest,
1110
        row,
1111
        rows,
1112
        types
1113
      )
1114
    end
1115
  end
1116

1117
  # Decimal32(P, S) 0x19 <uint8_precision><uint8_scale>
1118
  # Decimal64(P, S) 0x1A <uint8_precision><uint8_scale>
1119
  # Decimal128(P, S) 0x1B <uint8_precision><uint8_scale>
1120
  # Decimal256(P, S) 0x1C <uint8_precision><uint8_scale>
1121
  for {code, size} <- [{0x19, 32}, {0x1A, 64}, {0x1B, 128}, {0x1C, 256}] do
1122
    defp decode_dynamic(
1123
           <<unquote(code), _precision, scale, rest::bytes>>,
1124
           dynamic,
1125
           types_rest,
1126
           row,
1127
           rows,
1128
           types
1129
         ) do
1130
      decode_dynamic_continue(
8✔
1131
        rest,
1132
        [{:decimal, unquote(size), scale} | dynamic],
1133
        types_rest,
1134
        row,
1135
        rows,
1136
        types
1137
      )
1138
    end
1139
  end
1140

1141
  # Array(T) 0x1E <nested_type_encoding>
1142
  defp decode_dynamic(<<0x1E, rest::bytes>>, dynamic, types_rest, row, rows, types) do
1143
    decode_dynamic_continue(rest, [:array | dynamic], types_rest, row, rows, types)
74✔
1144
  end
1145

1146
  # Nullable(T)        0x23 <nested_type_encoding>
1147
  defp decode_dynamic(<<0x23, rest::bytes>>, dynamic, types_rest, row, rows, types) do
1148
    decode_dynamic_continue(rest, [:nullable | dynamic], types_rest, row, rows, types)
12✔
1149
  end
1150

1151
  # LowCardinality(T) 0x26 <nested_type_encoding>
1152
  defp decode_dynamic(<<0x26, rest::bytes>>, dynamic, types_rest, row, rows, types) do
1153
    decode_dynamic_continue(rest, [:low_cardinality | dynamic], types_rest, row, rows, types)
4✔
1154
  end
1155

1156
  # JSON(max_dynamic_paths=N, max_dynamic_types=M, path Type, SKIP skip_path, SKIP REGEXP skip_path_regexp) 0x30<uint8_serialization_version><var_int_max_dynamic_paths><uint8_max_dynamic_types><var_uint_number_of_typed_paths><var_uint_path_name_size_1><path_name_data_1><encoded_type_1>...<var_uint_number_of_skip_paths><var_uint_skip_path_size_1><skip_path_data_1>...<var_uint_number_of_skip_path_regexps><var_uint_skip_path_regexp_size_1><skip_path_data_regexp_1>...
1157
  defp decode_dynamic(<<0x30, rest::bytes>>, dynamic, types_rest, row, rows, types) do
1158
    # Assert uint8_serialization_version to be 0
1159
    <<0x00, rest::bytes>> = rest
16✔
1160

1161
    # Skip var_int_max_dynamic_paths
1162
    {_paths, rest} = read_varint(rest)
16✔
1163

1164
    # Skip uint8_max_dynamic_types
1165
    <<_val, rest::bytes>> = rest
16✔
1166

1167
    # Read var_uint_number_of_typed_paths
1168
    {typed_paths, rest} = read_varint(rest)
16✔
1169

1170
    # Skip `typed_paths` typed paths
1171
    rest =
16✔
1172
      Enum.reduce(1..typed_paths//1, rest, fn _, rest ->
1173
        {count, rest} = read_varint(rest)
2✔
1174
        <<_discard::size(count)-bytes, rest::bytes>> = rest
2✔
1175
        skip_type(rest)
2✔
1176
      end)
1177

1178
    # Read var_uint_number_of_skip_paths
1179
    {skip_paths, rest} = read_varint(rest)
14✔
1180

1181
    # Skip `skip_paths` skipped paths
1182
    rest =
14✔
1183
      Enum.reduce(1..skip_paths//1, rest, fn _, rest ->
UNCOV
1184
        {count, rest} = read_varint(rest)
×
UNCOV
1185
        <<_discard::size(count)-bytes, rest::bytes>> = rest
×
UNCOV
1186
        rest
×
1187
      end)
1188

1189
    # Read var_uint_number_of_skip_path_regexps
1190
    {skip_path_regexes, rest} = read_varint(rest)
14✔
1191

1192
    # Skip `skip_path_regexes` skipped paths regex
1193
    rest =
14✔
1194
      Enum.reduce(1..skip_path_regexes//1, rest, fn _, rest ->
UNCOV
1195
        {count, rest} = read_varint(rest)
×
UNCOV
1196
        <<_discard::size(count)-bytes, rest::bytes>> = rest
×
UNCOV
1197
        rest
×
1198
      end)
1199

1200
    decode_dynamic_continue(rest, [:json | dynamic], types_rest, row, rows, types)
14✔
1201
  end
1202

1203
  for {pattern, value} <- varints do
1204
    defp read_varint(<<unquote(pattern), rest::bytes>>), do: {unquote(value), rest}
62✔
1205
  end
1206

1207
  other_dynamic_types = [
1208
    datetime: 0x11,
1209
    set: 0x21,
1210
    bfloat16: 0x31,
1211
    time: 0x32
1212
  ]
1213

1214
  # Consume a type header from binary input, returning the rest.
1215
  # TODO: Only supports single-byte type headers for now.
1216
  def skip_type(<<type, rest::bytes>>)
UNCOV
1217
      when type in unquote(Keyword.values(dynamic_types ++ other_dynamic_types)), do: rest
×
1218

1219
  def skip_type(<<type, _::bytes>>) do
1220
    raise ArgumentError,
2✔
1221
          "Unsupported type definition (starting with 0x#{Base.encode16(<<type>>)}) while decoding dynamic JSON. Only single-byte type identifiers are currently supported."
2✔
1222
  end
1223

1224
  # TODO
1225
  # Enum8        0x17 <var_uint_number_of_elements><var_uint_name_size_1><name_data_1><int8_value_1>...<var_uint_name_size_N><name_data_N><int8_value_N>
1226
  # Enum16        0x18 <var_uint_number_of_elements><var_uint_name_size_1><name_data_1><int16_little_endian_value_1>...><var_uint_name_size_N><name_data_N><int16_little_endian_value_N>
1227
  # Tuple(T1, ..., TN)        0x1F <var_uint_number_of_elements><nested_type_encoding_1>...<nested_type_encoding_N>
1228
  # Tuple(name1 T1, ..., nameN TN)        0x20 <var_uint_number_of_elements><var_uint_name_size_1><name_data_1><nested_type_encoding_1>...<var_uint_name_size_N><name_data_N><nested_type_encoding_N>
1229
  # Set        0x21
1230
  # Interval        0x22 <interval_kind> (see interval kind binary encoding)
1231
  # Function        0x24<var_uint_number_of_arguments><argument_type_encoding_1>...<argument_type_encoding_N><return_type_encoding>
1232
  # AggregateFunction(function_name(param_1, ..., param_N), arg_T1, ..., arg_TN)        0x25<var_uint_version><var_uint_function_name_size><function_name_data><var_uint_number_of_parameters><param_1>...<param_N><var_uint_number_of_arguments><argument_type_encoding_1>...<argument_type_encoding_N> (see aggregate function parameter binary encoding)
1233
  # Map(K, V)        0x27<key_type_encoding><value_type_encoding>
1234
  # Variant(T1, ..., TN)        0x2A<var_uint_number_of_variants><variant_type_encoding_1>...<variant_type_encoding_N>
1235
  # Dynamic(max_types=N)        0x2B<uint8_max_types>
1236
  # Custom type (Ring, Polygon, etc)        0x2C<var_uint_type_name_size><type_name_data>
1237
  # SimpleAggregateFunction(function_name(param_1, ..., param_N), arg_T1, ..., arg_TN)        0x2E<var_uint_function_name_size><function_name_data><var_uint_number_of_parameters><param_1>...<param_N><var_uint_number_of_arguments><argument_type_encoding_1>...<argument_type_encoding_N> (see aggregate function parameter binary encoding)
1238
  # Nested(name1 T1, ..., nameN TN)        0x2F<var_uint_number_of_elements><var_uint_name_size_1><name_data_1><nested_type_encoding_1>...<var_uint_name_size_N><name_data_N><nested_type_encoding_N>
1239

1240
  unsupported_dynamic_types = %{
1241
    "Enum8" => 0x17,
1242
    "Enum16" => 0x18,
1243
    "Tuple" => 0x1F,
1244
    "TupleWithNames" => 0x20,
1245
    "Set" => 0x21,
1246
    "Interval" => 0x22,
1247
    "Function" => 0x24,
1248
    "AggregateFunction" => 0x25,
1249
    "Map" => 0x27,
1250
    "Variant" => 0x2A,
1251
    "Dynamic" => 0x2B,
1252
    "CustomType" => 0x2C,
1253
    "SimpleAggregateFunction" => 0x2E,
1254
    "Nested" => 0x2F
1255
  }
1256

1257
  for {type, code} <- unsupported_dynamic_types do
1258
    defp decode_dynamic(<<unquote(code), _::bytes>>, _dynamic, _types_rest, _row, _rows, _types) do
1259
      raise ArgumentError, "unsupported dynamic type #{unquote(type)}"
14✔
1260
    end
1261
  end
1262

1263
  defp decode_dynamic(<<bin::bytes>>, dynamic, types_rest, row, rows, _types) do
1264
    to_be_continued(rows, bin, [{:dynamic, dynamic} | types_rest], row)
2✔
1265
  end
1266

1267
  @compile inline: [decode_dynamic_continue: 6]
1268

1269
  defp decode_dynamic_continue(<<rest::bytes>>, dynamic, types_rest, row, rows, types) do
1270
    continue? =
318✔
1271
      case dynamic do
1272
        [:array | _] -> true
74✔
1273
        [:nullable | _] -> true
12✔
1274
        [:low_cardinality | _] -> true
4✔
1275
        _ -> false
244✔
1276
      end
1277

1278
    if continue? do
318✔
1279
      decode_dynamic(rest, dynamic, types_rest, row, rows, types)
90✔
1280
    else
1281
      type = build_dynamic_type(:lists.reverse(dynamic))
244✔
1282
      decode_rows([type | types_rest], rest, row, rows, types)
244✔
1283
    end
1284
  end
1285

1286
  defp build_dynamic_type([type]), do: type
290✔
1287

1288
  defp build_dynamic_type(type) do
1289
    case type do
88✔
1290
      [:array | rest] -> {:array, build_dynamic_type(rest)}
72✔
1291
      [:nullable | rest] -> {:nullable, build_dynamic_type(rest)}
12✔
1292
      [:low_cardinality | rest] -> build_dynamic_type(rest)
4✔
1293
    end
1294
  end
1295

1296
  simple_types = %{
1297
    u8: %{pattern: quote(do: <<u>>), value: quote(do: u)},
1298
    u16: %{pattern: quote(do: <<u::16-little>>), value: quote(do: u)},
1299
    u32: %{pattern: quote(do: <<u::32-little>>), value: quote(do: u)},
1300
    u64: %{pattern: quote(do: <<u::64-little>>), value: quote(do: u)},
1301
    u128: %{pattern: quote(do: <<u::128-little>>), value: quote(do: u)},
1302
    u256: %{pattern: quote(do: <<u::256-little>>), value: quote(do: u)},
1303
    i8: %{pattern: quote(do: <<i::signed>>), value: quote(do: i)},
1304
    i16: %{pattern: quote(do: <<i::16-little-signed>>), value: quote(do: i)},
1305
    i32: %{pattern: quote(do: <<i::32-little-signed>>), value: quote(do: i)},
1306
    i64: %{pattern: quote(do: <<i::64-little-signed>>), value: quote(do: i)},
1307
    i128: %{pattern: quote(do: <<i::128-little-signed>>), value: quote(do: i)},
1308
    i256: %{pattern: quote(do: <<i::256-little-signed>>), value: quote(do: i)},
1309
    f32: [
1310
      %{pattern: quote(do: <<f::32-little-float>>), value: quote(do: f)},
1311
      %{pattern: quote(do: <<_nan_or_inf::32>>), value: quote(do: nil)}
1312
    ],
1313
    f64: [
1314
      %{pattern: quote(do: <<f::64-little-float>>), value: quote(do: f)},
1315
      %{pattern: quote(do: <<_nan_or_inf::64>>), value: quote(do: nil)}
1316
    ],
1317
    uuid: %{
1318
      pattern: quote(do: <<u1::64-little, u2::64-little>>),
1319
      value: quote(do: <<u1::64, u2::64>>)
1320
    },
1321
    date: %{
1322
      pattern: quote(do: <<d::16-little>>),
1323
      value: quote(do: Date.add(@epoch_date, d))
1324
    },
1325
    date32: %{
1326
      pattern: quote(do: <<d::32-little-signed>>),
1327
      value: quote(do: Date.add(@epoch_date, d))
1328
    },
1329
    time: %{
1330
      pattern: quote(do: <<s::32-little-signed>>),
1331
      value: quote(do: time_after_midnight(s, 1))
1332
    },
1333
    boolean: [
1334
      %{pattern: quote(do: <<0>>), value: quote(do: false)},
1335
      %{pattern: quote(do: <<1>>), value: quote(do: true)},
1336
      %{pattern: quote(do: <<b>>), value: quote(do: raise("invalid boolean value: #{b}"))}
1337
    ],
1338
    ipv4: %{
1339
      pattern: quote(do: <<b4, b3, b2, b1>>),
1340
      value: quote(do: {b1, b2, b3, b4})
1341
    },
1342
    ipv6: %{
1343
      pattern: quote(do: <<b1::16, b2::16, b3::16, b4::16, b5::16, b6::16, b7::16, b8::16>>),
1344
      value: quote(do: {b1, b2, b3, b4, b5, b6, b7, b8})
1345
    },
1346
    point: %{
1347
      pattern: quote(do: <<x::64-little-float, y::64-little-float>>),
1348
      value: quote(do: {x, y})
1349
    }
1350
  }
1351

1352
  for {type, clauses} <- simple_types do
1353
    fun = :"decode_#{type}_decode_rows"
1354
    @compile inline: [{fun, 5}]
1355

1356
    for %{pattern: pattern, value: value} <- List.wrap(clauses) do
1357
      defp unquote(fun)(<<unquote(pattern), rest::bytes>>, types_rest, row, rows, types) do
1358
        decode_rows(types_rest, rest, [unquote(value) | row], rows, types)
2,154,378✔
1359
      end
1360
    end
1361

1362
    defp unquote(fun)(<<bin::bytes>>, types_rest, row, rows, _types) do
1363
      to_be_continued(rows, bin, [unquote(type) | types_rest], row)
582✔
1364
    end
1365
  end
1366

1367
  defp decode_rows([type | types_rest], <<bin::bytes>>, row, rows, types) do
1368
    case type do
2,358,485✔
1369
      :u8 ->
1370
        decode_u8_decode_rows(bin, types_rest, row, rows, types)
1,218✔
1371

1372
      :u16 ->
1373
        decode_u16_decode_rows(bin, types_rest, row, rows, types)
1,524✔
1374

1375
      :u32 ->
1376
        decode_u32_decode_rows(bin, types_rest, row, rows, types)
73✔
1377

1378
      :u64 ->
1379
        decode_u64_decode_rows(bin, types_rest, row, rows, types)
2,150,989✔
1380

1381
      :u128 ->
1382
        decode_u128_decode_rows(bin, types_rest, row, rows, types)
40✔
1383

1384
      :u256 ->
1385
        decode_u256_decode_rows(bin, types_rest, row, rows, types)
72✔
1386

1387
      :i8 ->
1388
        decode_i8_decode_rows(bin, types_rest, row, rows, types)
40✔
1389

1390
      :i16 ->
1391
        decode_i16_decode_rows(bin, types_rest, row, rows, types)
20✔
1392

1393
      :i32 ->
1394
        decode_i32_decode_rows(bin, types_rest, row, rows, types)
42✔
1395

1396
      :i64 ->
1397
        decode_i64_decode_rows(bin, types_rest, row, rows, types)
178✔
1398

1399
      :i128 ->
1400
        decode_i128_decode_rows(bin, types_rest, row, rows, types)
41✔
1401

1402
      :i256 ->
1403
        decode_i256_decode_rows(bin, types_rest, row, rows, types)
73✔
1404

1405
      :f32 ->
1406
        decode_f32_decode_rows(bin, types_rest, row, rows, types)
36✔
1407

1408
      :f64 ->
1409
        decode_f64_decode_rows(bin, types_rest, row, rows, types)
60✔
1410

1411
      :string ->
1412
        decode_string_decode_rows(bin, types_rest, row, rows, types)
101,294✔
1413

1414
      :binary ->
1415
        decode_binary_decode_rows(bin, types_rest, row, rows, types)
100,011✔
1416

1417
      :json ->
1418
        # assuming it arrives as text and not "native" binary JSON
1419
        # i.e. assumes `settings: [output_format_binary_write_json_as_string: 1]`
1420
        # TODO
1421
        decode_string_json_decode_rows(bin, types_rest, row, rows, types)
144✔
1422

1423
      :dynamic ->
1424
        decode_dynamic(bin, _dynamic = [], types_rest, row, rows, types)
306✔
1425

1426
      {:dynamic, dynamic} ->
1427
        decode_dynamic(bin, dynamic, types_rest, row, rows, types)
2✔
1428

1429
      # TODO utf8?
1430
      {:fixed_string, size} ->
1431
        case bin do
43✔
1432
          <<s::size(^size)-bytes, rest::bytes>> ->
1433
            decode_rows(types_rest, rest, [s | row], rows, types)
29✔
1434

1435
          _ ->
1436
            to_be_continued(rows, bin, [type | types_rest], row)
14✔
1437
        end
1438

1439
      :boolean ->
1440
        decode_boolean_decode_rows(bin, types_rest, row, rows, types)
66✔
1441

1442
      :uuid ->
1443
        decode_uuid_decode_rows(bin, types_rest, row, rows, types)
86✔
1444

1445
      :date ->
1446
        decode_date_decode_rows(bin, types_rest, row, rows, types)
42✔
1447

1448
      :date32 ->
1449
        decode_date32_decode_rows(bin, types_rest, row, rows, types)
40✔
1450

1451
      :time ->
1452
        decode_time_decode_rows(bin, types_rest, row, rows, types)
46✔
1453

1454
      {:time64, time_unit} ->
1455
        case bin do
180✔
1456
          <<ticks::64-little-signed, bin::bytes>> ->
1457
            time = time_after_midnight(ticks, time_unit)
150✔
1458
            decode_rows(types_rest, bin, [time | row], rows, types)
142✔
1459

1460
          _ ->
1461
            to_be_continued(rows, bin, [type | types_rest], row)
30✔
1462
        end
1463

1464
      {:datetime, timezone} ->
1465
        case bin do
88✔
1466
          <<s::32-little, bin::bytes>> ->
1467
            dt = DateTime.from_unix!(s)
66✔
1468

1469
            dt =
66✔
1470
              case timezone do
1471
                nil -> DateTime.to_naive(dt)
26✔
1472
                "UTC" -> dt
22✔
1473
                _ -> DateTime.shift_zone!(dt, timezone)
18✔
1474
              end
1475

1476
            decode_rows(types_rest, bin, [dt | row], rows, types)
64✔
1477

1478
          _ ->
1479
            to_be_continued(rows, bin, [type | types_rest], row)
22✔
1480
        end
1481

1482
      {:decimal, size, scale} ->
1483
        case bin do
198✔
1484
          <<val::size(^size)-little-signed, bin::bytes>> ->
1485
            sign = if val < 0, do: -1, else: 1
80✔
1486
            d = Decimal.new(sign, abs(val), -scale)
80✔
1487
            decode_rows(types_rest, bin, [d | row], rows, types)
80✔
1488

1489
          _ ->
1490
            to_be_continued(rows, bin, [type | types_rest], row)
118✔
1491
        end
1492

1493
      {:nullable, inner_type} ->
1494
        case bin do
181✔
1495
          <<b, bin::bytes>> ->
1496
            case b do
178✔
1497
              0 -> decode_rows([inner_type | types_rest], bin, row, rows, types)
85✔
1498
              1 -> decode_rows(types_rest, bin, [nil | row], rows, types)
93✔
1499
            end
1500

1501
          _ ->
1502
            to_be_continued(rows, bin, [type | types_rest], row)
3✔
1503
        end
1504

1505
      :nothing ->
1506
        decode_rows(types_rest, bin, [nil | row], rows, types)
62✔
1507

1508
      {:array, inner_type} ->
1509
        decode_array_decode_rows(bin, inner_type, types_rest, row, rows, types)
382✔
1510

1511
      {:array_over, original_row} ->
1512
        decode_rows(types_rest, bin, [:lists.reverse(row) | original_row], rows, types)
285✔
1513

1514
      {:map, key_type, value_type} ->
1515
        decode_map_decode_rows(bin, key_type, value_type, types_rest, row, rows, types)
48✔
1516

1517
      {:map_over, original_row} ->
1518
        map = row |> Enum.chunk_every(2) |> Enum.map(fn [v, k] -> {k, v} end) |> Map.new()
32✔
1519
        decode_rows(types_rest, bin, [map | original_row], rows, types)
32✔
1520

1521
      {:tuple, tuple_types} ->
1522
        decode_rows(tuple_types ++ [{:tuple_over, row} | types_rest], bin, [], rows, types)
26✔
1523

1524
      {:tuple_over, original_row} ->
1525
        tuple = row |> :lists.reverse() |> List.to_tuple()
26✔
1526
        decode_rows(types_rest, bin, [tuple | original_row], rows, types)
26✔
1527

1528
      {:variant, variant_types} ->
1529
        case bin do
59✔
1530
          <<255, bin::bytes>> ->
1531
            # 255 is the variant type index for "nothing"
1532
            decode_rows(types_rest, bin, [nil | row], rows, types)
12✔
1533

1534
          # TODO varint?
1535
          <<variant_type_index::8, bin::bytes>> ->
1536
            variant_type = Enum.at(variant_types, variant_type_index)
44✔
1537
            decode_rows([variant_type | types_rest], bin, row, rows, types)
44✔
1538

1539
          _ ->
1540
            to_be_continued(rows, bin, [type | types_rest], row)
3✔
1541
        end
1542

1543
      {:datetime64, time_unit, timezone} ->
1544
        case bin do
122✔
1545
          <<s::64-little-signed, bin::bytes>> ->
1546
            dt = DateTime.from_unix!(s, time_unit)
60✔
1547

1548
            dt =
60✔
1549
              case timezone do
1550
                nil -> DateTime.to_naive(dt)
34✔
1551
                "UTC" -> dt
8✔
1552
                _ -> DateTime.shift_zone!(dt, timezone)
18✔
1553
              end
1554

1555
            decode_rows(types_rest, bin, [dt | row], rows, types)
60✔
1556

1557
          _ ->
1558
            to_be_continued(rows, bin, [type | types_rest], row)
62✔
1559
        end
1560

1561
      {:enum8, mapping} ->
1562
        case bin do
28✔
1563
          <<v::signed, bin::bytes>> ->
1564
            decode_rows(types_rest, bin, [Map.fetch!(mapping, v) | row], rows, types)
28✔
1565

1566
          _ ->
UNCOV
1567
            to_be_continued(rows, bin, [type | types_rest], row)
×
1568
        end
1569

1570
      {:enum16, mapping} ->
1571
        case bin do
8✔
1572
          <<v::16-little-signed, bin::bytes>> ->
1573
            decode_rows(types_rest, bin, [Map.fetch!(mapping, v) | row], rows, types)
4✔
1574

1575
          _ ->
1576
            to_be_continued(rows, bin, [type | types_rest], row)
4✔
1577
        end
1578

1579
      :ipv4 ->
1580
        decode_ipv4_decode_rows(bin, types_rest, row, rows, types)
24✔
1581

1582
      :ipv6 ->
1583
        decode_ipv6_decode_rows(bin, types_rest, row, rows, types)
74✔
1584

1585
      :point ->
1586
        decode_point_decode_rows(bin, types_rest, row, rows, types)
176✔
1587
    end
1588
  end
1589

1590
  defp decode_rows([], <<>> = empty, row, rows, _types) do
1591
    rows = :lists.reverse([:lists.reverse(row) | rows])
1,379✔
1592
    {rows, empty, _no_state = nil}
1,379✔
1593
  end
1594

1595
  defp decode_rows([], <<bin::bytes>>, row, rows, types) do
1596
    row = :lists.reverse(row)
2,151,127✔
1597
    decode_rows(types, bin, [], [row | rows], types)
2,151,127✔
1598
  end
1599

1600
  defp decode_rows([_ | _] = types_rest, <<>> = empty, row, rows, _types) do
UNCOV
1601
    to_be_continued(rows, empty, types_rest, row)
×
1602
  end
1603

1604
  @compile inline: [to_be_continued: 4]
1605
  defp to_be_continued(rows, bin, types_rest, row) do
1606
    {:lists.reverse(rows), bin, {:cont, types_rest, row}}
200,719✔
1607
  end
1608

1609
  @compile inline: [decimal_size: 1]
1610
  # https://clickhouse.com/docs/en/sql-reference/data-types/decimal/
1611
  defp decimal_size(precision) when is_integer(precision) do
1612
    cond do
62✔
1613
      precision >= 39 -> 256
4✔
1614
      precision >= 19 -> 128
58✔
1615
      precision >= 10 -> 64
49✔
1616
      true -> 32
33✔
1617
    end
1618
  end
1619

1620
  @compile inline: [time_unit: 1]
1621
  for precision <- 0..9 do
1622
    time_unit = round(:math.pow(10, precision))
1623

1624
    defp time_unit(unquote(precision)), do: unquote(time_unit)
183✔
1625
  end
1626

1627
  @compile inline: [time_after_midnight: 2]
1628
  defp time_after_midnight(ticks, time_unit) do
1629
    if ticks >= 0 and ticks < 86400 * time_unit do
182✔
1630
      ticks |> DateTime.from_unix!(time_unit) |> DateTime.to_time()
166✔
1631
    else
1632
      # since ClickHouse supports Time64 values of [-999:59:59.999999999, 999:59:59.999999999]
1633
      # and Elixir's Time supports values of [00:00:00.000000, 23:59:59.999999]
1634
      # we raise an error when ClickHouse's Time64 value is out of Elixir's Time range
1635
      raise ArgumentError,
8✔
1636
            "ClickHouse Time value #{:erlang.float_to_binary(ticks / time_unit, [:short])} (seconds) is out of Elixir's Time range (00:00:00.000000 - 23:59:59.999999)"
8✔
1637

1638
      # TODO: we could potentially decode ClickHouse's Time/Time64 values as Elixir's Duration when it's out of Elixir's Time range
1639
    end
1640
  end
1641
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc