• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

code-shoily / choreo / 8f4569353e6f529ff097053f0fd6c68ee22ad359

26 Apr 2026 08:10PM UTC coverage: 92.503% (+0.2%) from 92.313%
8f4569353e6f529ff097053f0fd6c68ee22ad359

push

github

code-shoily
Update README and CHANGESET

1419 of 1534 relevant lines covered (92.5%)

20.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.89
/lib/choreo/workflow.ex
1
defmodule Choreo.Workflow do
2
  @moduledoc """
3
  Workflow / task orchestration diagram builder on top of Yog.
4

5
  `Choreo.Workflow` models automated task orchestration where nodes are
6
  process steps and edges are execution dependencies. It supports:
7

8
    * **Tasks** — automated steps with timeout and retry config
9
    * **Decisions** — conditional branching
10
    * **Fork / Join** — parallel execution paths
11
    * **Compensations** — Saga-pattern rollback handlers
12
    * **Events** — triggers, timers, signals
13
    * **Swimlanes** — group tasks by team, service, or domain
14

15
  ## When to use
16

17
  Use `Choreo.Workflow` when designing distributed business processes,
18
  Saga transactions, CI/CD pipelines, or approval flows. It identifies
19
  the critical path, finds parallelizable tasks, and verifies that every
20
  failure scenario has a compensation route.
21

22
  ## Further reading
23

24
    * [Saga Pattern (Microsoft)](https://learn.microsoft.com/en-us/azure/architecture/reference-architectures/saga/saga)
25
    * [BPMN 2.0 Specification](https://www.omg.org/spec/BPMN/2.0/)
26
    * [Workflow Patterns Initiative](http://www.workflowpatterns.com/)
27

28
  ## Quick Start
29

30
      workflow =
31
        Choreo.Workflow.new()
32
        |> Choreo.Workflow.add_start(:order_received)
33
        |> Choreo.Workflow.add_task(:charge_card, timeout_ms: 5000, retry: 3)
34
        |> Choreo.Workflow.add_task(:reserve_inventory, timeout_ms: 3000)
35
        |> Choreo.Workflow.add_decision(:sufficient_stock)
36
        |> Choreo.Workflow.add_task(:pack_items, timeout_ms: 10_000)
37
        |> Choreo.Workflow.add_task(:ship_order, timeout_ms: 5000)
38
        |> Choreo.Workflow.add_compensation(:refund_payment, for: :charge_card)
39
        |> Choreo.Workflow.add_end(:done)
40
        |> Choreo.Workflow.connect(:order_received, :charge_card)
41
        |> Choreo.Workflow.connect(:charge_card, :reserve_inventory)
42
        |> Choreo.Workflow.connect(:reserve_inventory, :sufficient_stock)
43
        |> Choreo.Workflow.connect(:sufficient_stock, :pack_items, condition: "yes")
44
        |> Choreo.Workflow.connect(:sufficient_stock, :refund_payment, condition: "no", edge_type: :compensation)
45
        |> Choreo.Workflow.connect(:pack_items, :ship_order)
46
        |> Choreo.Workflow.connect(:ship_order, :done)
47

48
      dot = Choreo.Workflow.to_dot(workflow)
49

50
  ## Diagram
51

52
  <div class="graphviz">
53
    digraph G {
54
      graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2];
55
      node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"];
56
      edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0];
57

58
      done [label="done", penwidth="2.0", fillcolor="#ef4444", shape="doublecircle"];
59
      order_received [label="order_received", penwidth="2.0", fillcolor="#10b981", shape="circle"];
60
      charge_card [label="charge_card\n(5000ms)\nretry: 3", fillcolor="#3b82f6", shape="box3d"];
61
      reserve_inventory [label="reserve_inventory\n(3000ms)", fillcolor="#3b82f6", shape="box3d"];
62
      sufficient_stock [label="sufficient_stock", fillcolor="#8b5cf6", shape="diamond"];
63
      pack_items [label="pack_items\n(10000ms)", fillcolor="#3b82f6", shape="box3d"];
64
      ship_order [label="ship_order\n(5000ms)", fillcolor="#3b82f6", shape="box3d"];
65
      refund_payment [label="refund_payment", color="#ef4444", style="filled,dashed", fillcolor="#f87171", shape="note"];
66

67
      order_received -> charge_card [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"];
68
      charge_card -> reserve_inventory [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"];
69
      reserve_inventory -> sufficient_stock [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"];
70
      sufficient_stock -> pack_items [style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b", label="yes"];
71
      sufficient_stock -> refund_payment [style="dashed", penwidth="1.5", fontcolor="#ef4444", color="#ef4444", label="no"];
72
      pack_items -> ship_order [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"];
73
      ship_order -> done [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"];
74
    }
75
  </div>
76

77
  ## Analysis
78

79
      # Longest-latency path through the workflow
80
      {:ok, path, latency} = Choreo.Workflow.Analysis.critical_path(workflow)
81

82
      # Tasks that can run in parallel
83
      Choreo.Workflow.Analysis.parallelizable_tasks(workflow)
84

85
      # Tasks with missing compensations
86
      Choreo.Workflow.Analysis.missing_compensations(workflow)
87

88
      # Validation
89
      Choreo.Workflow.Analysis.validate(workflow)
90
  """
91

92
  @type t :: %__MODULE__{
93
          graph: Yog.Multi.Graph.t(),
94
          edge_meta: %{optional(Yog.Multi.Graph.edge_id()) => map()},
95
          clusters: %{String.t() => map()}
96
        }
97

98
  defstruct graph: nil, edge_meta: %{}, clusters: %{}
99

100
  @add_swimlane_schema [
101
    label: [
102
      type: :string,
103
      required: false
104
    ]
105
  ]
106

107
  @add_start_schema [
108
    label: [
109
      type: :string,
110
      required: false
111
    ],
112
    description: [
113
      type: :string,
114
      required: false
115
    ],
116
    swimlane: [
117
      type: :string,
118
      required: false
119
    ]
120
  ]
121

122
  @add_end_schema [
123
    label: [
124
      type: :string,
125
      required: false
126
    ],
127
    description: [
128
      type: :string,
129
      required: false
130
    ],
131
    swimlane: [
132
      type: :string,
133
      required: false
134
    ]
135
  ]
136

137
  @add_task_schema [
138
    timeout_ms: [
139
      type: :integer,
140
      required: false
141
    ],
142
    retry: [
143
      type: :integer,
144
      required: false
145
    ],
146
    retry_backoff_ms: [
147
      type: :integer,
148
      required: false
149
    ],
150
    label: [
151
      type: :string,
152
      required: false
153
    ],
154
    handler: [
155
      type: {:or, [:atom, :string]},
156
      required: false
157
    ],
158
    description: [
159
      type: :string,
160
      required: false
161
    ],
162
    swimlane: [
163
      type: :string,
164
      required: false
165
    ]
166
  ]
167

168
  @add_decision_schema [
169
    label: [
170
      type: :string,
171
      required: false
172
    ],
173
    description: [
174
      type: :string,
175
      required: false
176
    ],
177
    swimlane: [
178
      type: :string,
179
      required: false
180
    ]
181
  ]
182

183
  @add_fork_schema [
184
    label: [
185
      type: :string,
186
      required: false
187
    ],
188
    description: [
189
      type: :string,
190
      required: false
191
    ],
192
    swimlane: [
193
      type: :string,
194
      required: false
195
    ]
196
  ]
197

198
  @add_join_schema [
199
    label: [
200
      type: :string,
201
      required: false
202
    ],
203
    description: [
204
      type: :string,
205
      required: false
206
    ],
207
    swimlane: [
208
      type: :string,
209
      required: false
210
    ]
211
  ]
212

213
  @add_compensation_schema [
214
    for: [
215
      type: {:or, [:atom, :string]},
216
      required: false
217
    ],
218
    label: [
219
      type: :string,
220
      required: false
221
    ],
222
    handler: [
223
      type: {:or, [:atom, :string]},
224
      required: false
225
    ],
226
    description: [
227
      type: :string,
228
      required: false
229
    ],
230
    swimlane: [
231
      type: :string,
232
      required: false
233
    ]
234
  ]
235

236
  @add_event_schema [
237
    label: [
238
      type: :string,
239
      required: false
240
    ],
241
    description: [
242
      type: :string,
243
      required: false
244
    ],
245
    swimlane: [
246
      type: :string,
247
      required: false
248
    ]
249
  ]
250

251
  @connect_schema [
252
    condition: [
253
      type: :string,
254
      required: false
255
    ],
256
    edge_type: [
257
      type: {:in, [:sequence, :compensation, :retry, :failure, :timeout, :error]},
258
      required: false
259
    ],
260
    weight: [
261
      type: {:or, [:integer, :float]},
262
      required: false
263
    ],
264
    label: [
265
      type: :string,
266
      required: false
267
    ]
268
  ]
269

270
  # ============================================================================
271
  # Creation
272
  # ============================================================================
273

274
  @doc """
275
  Creates a new empty workflow graph.
276

277
  Workflow graphs are always directed.
278

279
  ## Examples
280

281
      iex> workflow = Choreo.Workflow.new()
282
      iex> Choreo.Workflow.nodes(workflow)
283
      []
284
      iex> Choreo.Workflow.starts(workflow)
285
      []
286
      iex> Choreo.Workflow.ends(workflow)
287
      []
288
  """
289
  @spec new() :: t()
290
  def new do
291
    %__MODULE__{
75✔
292
      graph: Yog.Multi.new(:directed),
293
      edge_meta: %{},
294
      clusters: %{}
295
    }
296
  end
297

298
  # ============================================================================
299
  # Node builders
300
  # ============================================================================
301

302
  @doc """
303
  Adds a start node (entry point).
304

305
  ## Examples
306

307
      iex> workflow = Choreo.Workflow.new()
308
      iex> workflow = Choreo.Workflow.add_start(workflow, :begin, label: "Start")
309
      iex> Choreo.Workflow.starts(workflow)
310
      [:begin]
311
      iex> Map.get(workflow.graph.nodes, :begin).node_type
312
      :start
313

314
  ## Diagram
315

316
  <div class="graphviz">
317
    digraph G {
318
      graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2];
319
      node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"];
320
      edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0];
321

322
      begin [label="Start", penwidth="2.0", fillcolor="#10b981", shape="circle"];
323
    }
324
  </div>
325
  """
326
  @spec add_start(t(), Yog.node_id(), keyword()) :: t()
327
  def add_start(%__MODULE__{} = workflow, id, opts \\ []) do
328
    opts = NimbleOptions.validate!(opts, @add_start_schema)
43✔
329
    add_typed_node(workflow, id, :start, opts)
43✔
330
  end
331

332
  @doc """
333
  Adds an end node (terminal).
334

335
  ## Examples
336

337
      iex> workflow = Choreo.Workflow.new()
338
      iex> workflow = Choreo.Workflow.add_end(workflow, :finish, label: "End")
339
      iex> Choreo.Workflow.ends(workflow)
340
      [:finish]
341
      iex> Map.get(workflow.graph.nodes, :finish).node_type
342
      :end
343

344
  ## Diagram
345

346
  <div class="graphviz">
347
    digraph G {
348
      graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2];
349
      node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"];
350
      edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0];
351

352
      finish [label="End", penwidth="2.0", fillcolor="#ef4444", shape="doublecircle"];
353
    }
354
  </div>
355
  """
356
  @spec add_end(t(), Yog.node_id(), keyword()) :: t()
357
  def add_end(%__MODULE__{} = workflow, id, opts \\ []) do
358
    opts = NimbleOptions.validate!(opts, @add_end_schema)
33✔
359
    add_typed_node(workflow, id, :end, opts)
33✔
360
  end
361

362
  @doc """
363
  Adds an automated task node.
364

365
  ## Options
366

367
    * `:timeout_ms` — maximum time allowed for the task (default: `5000`)
368
    * `:retry` — number of retry attempts on failure (default: `0`)
369
    * `:retry_backoff_ms` — backoff between retries in milliseconds
370
    * `:label` — display label
371
    * `:handler` — handler name / reference
372
    * `:description` — tooltip text
373
    * `:swimlane` — swimlane group name
374

375
  ## Examples
376

377
      iex> workflow = Choreo.Workflow.new()
378
      iex> workflow = Choreo.Workflow.add_task(workflow, :process, timeout_ms: 5000, retry: 3)
379
      iex> Choreo.Workflow.tasks(workflow)
380
      [:process]
381
      iex> Map.get(workflow.graph.nodes, :process).timeout_ms
382
      5000
383
      iex> Map.get(workflow.graph.nodes, :process).retry
384
      3
385

386
  ## Diagram
387

388
  <div class="graphviz">
389
    digraph G {
390
      graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2];
391
      node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"];
392
      edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0];
393

394
      process [label="process\n(5000ms)\nretry: 3", fillcolor="#3b82f6", shape="box3d"];
395
    }
396
  </div>
397
  """
398
  def add_task(%__MODULE__{} = workflow, id, opts \\ []) do
399
    opts = NimbleOptions.validate!(opts, @add_task_schema)
88✔
400
    add_typed_node(workflow, id, :task, opts)
87✔
401
  end
402

403
  @doc """
404
  Adds a decision / gateway node for conditional branching.
405

406
  ## Examples
407

408
      iex> workflow = Choreo.Workflow.new()
409
      iex> workflow = Choreo.Workflow.add_decision(workflow, :check)
410
      iex> Map.get(workflow.graph.nodes, :check).node_type
411
      :decision
412

413
  ## Diagram
414

415
  <div class="graphviz">
416
    digraph G {
417
      graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2];
418
      node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"];
419
      edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0];
420

421
      check [label="check", fillcolor="#8b5cf6", shape="diamond"];
422
    }
423
  </div>
424
  """
425
  def add_decision(%__MODULE__{} = workflow, id, opts \\ []) do
426
    opts = NimbleOptions.validate!(opts, @add_decision_schema)
8✔
427
    add_typed_node(workflow, id, :decision, opts)
8✔
428
  end
429

430
  @doc """
431
  Adds a fork node that splits execution into parallel paths.
432

433
  ## Examples
434

435
      iex> workflow = Choreo.Workflow.new()
436
      iex> workflow = Choreo.Workflow.add_fork(workflow, :split)
437
      iex> Map.get(workflow.graph.nodes, :split).node_type
438
      :fork
439
  """
440
  @spec add_fork(t(), Yog.node_id(), keyword()) :: t()
441
  def add_fork(%__MODULE__{} = workflow, id, opts \\ []) do
442
    opts = NimbleOptions.validate!(opts, @add_fork_schema)
4✔
443
    add_typed_node(workflow, id, :fork, opts)
4✔
444
  end
445

446
  @doc """
447
  Adds a join node that merges parallel paths.
448

449
  ## Examples
450

451
      iex> workflow = Choreo.Workflow.new()
452
      iex> workflow = Choreo.Workflow.add_join(workflow, :merge)
453
      iex> Map.get(workflow.graph.nodes, :merge).node_type
454
      :join
455
  """
456
  def add_join(%__MODULE__{} = workflow, id, opts \\ []) do
457
    opts = NimbleOptions.validate!(opts, @add_join_schema)
4✔
458
    add_typed_node(workflow, id, :join, opts)
4✔
459
  end
460

461
  @doc """
462
  Adds a compensation / rollback node (Saga pattern).
463

464
  ## Options
465

466
    * `:for` — the task id this compensation rolls back
467
    * `:label` — display label
468
    * `:handler` — handler name / reference
469
    * `:description` — tooltip text
470

471
  ## Examples
472

473
      iex> workflow = Choreo.Workflow.new()
474
      iex> workflow = Choreo.Workflow.add_compensation(workflow, :rollback, for: :process)
475
      iex> Choreo.Workflow.compensations(workflow)
476
      [:rollback]
477
      iex> Map.get(workflow.graph.nodes, :rollback).target_task
478
      :process
479

480
  ## Diagram
481

482
  <div class="graphviz">
483
    digraph G {
484
      graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2];
485
      node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"];
486
      edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0];
487

488
      rollback [label="rollback", color="#ef4444", style="filled,dashed", fillcolor="#f87171", shape="note"];
489
    }
490
  </div>
491
  """
492
  @spec add_compensation(t(), Yog.node_id(), keyword()) :: t()
493
  def add_compensation(%__MODULE__{} = workflow, id, opts \\ []) do
494
    opts = NimbleOptions.validate!(opts, @add_compensation_schema)
17✔
495

496
    data = %{
17✔
497
      type: :workflow_node,
498
      node_type: :compensation,
499
      label: Keyword.get(opts, :label, to_string(id)),
17✔
500
      target_task: opts[:for],
501
      handler: opts[:handler],
502
      description: opts[:description]
503
    }
504

505
    data = put_swimlane(data, opts[:swimlane])
17✔
506
    %{workflow | graph: Yog.Multi.add_node(workflow.graph, id, data)}
17✔
507
  end
508

509
  @doc """
510
  Adds an event node (trigger, timer, signal).
511

512
  ## Examples
513

514
      iex> workflow = Choreo.Workflow.new()
515
      iex> workflow = Choreo.Workflow.add_event(workflow, :timer)
516
      iex> Map.get(workflow.graph.nodes, :timer).node_type
517
      :event
518
  """
519
  def add_event(%__MODULE__{} = workflow, id, opts \\ []) do
520
    opts = NimbleOptions.validate!(opts, @add_event_schema)
2✔
521
    add_typed_node(workflow, id, :event, opts)
2✔
522
  end
523

524
  # ============================================================================
525
  # Edge builder
526
  # ============================================================================
527

528
  @doc """
529
  Connects two workflow nodes with an execution dependency.
530

531
  Multiple connections are allowed per `(from, to)` pair (parallel edges).
532

533
  ## Options
534

535
    * `:condition` — branch condition label (shown on decision edges)
536
    * `:edge_type` — `:sequence` (default), `:compensation`, `:retry`, `:failure`, `:timeout`
537
    * `:weight` — edge weight for path calculations (defaults to target task timeout_ms)
538
    * `:label` — override edge label
539

540
  ## Examples
541

542
      iex> workflow = Choreo.Workflow.new()
543
      iex> workflow = workflow
544
      ...>   |> Choreo.Workflow.add_start(:a)
545
      ...>   |> Choreo.Workflow.add_task(:b)
546
      ...>   |> Choreo.Workflow.connect(:a, :b)
547
      iex> Choreo.Workflow.edges(workflow)
548
      iex> [{:a, :b, _, meta}] = Choreo.Workflow.edges_with_meta(workflow)
549
      iex> meta.edge_type
550
      :sequence
551

552
  ## Diagram
553

554
  <div class="graphviz">
555
    digraph G {
556
      graph [rankdir=TB, splines=spline, nodesep=0.6, ranksep=1.2];
557
      node [shape=box, style=filled, fillcolor="white", fontname="Helvetica", fontsize=12, fontcolor="white"];
558
      edge [arrowhead=normal, color="#64748b", style=solid, fontname="Helvetica", fontsize=10, penwidth=1.0];
559

560
      a [label="a", penwidth="2.0", fillcolor="#10b981", shape="circle"];
561
      b [label="b", fillcolor="#3b82f6", shape="box3d"];
562

563
      a -> b [label="", style="solid", penwidth="1.0", fontcolor="#64748b", color="#64748b"];
564
    }
565
  </div>
566
  """
567
  def connect(%__MODULE__{} = workflow, from, to, opts \\ []) do
568
    opts = NimbleOptions.validate!(opts, @connect_schema)
121✔
569
    edge_type = Keyword.get(opts, :edge_type, :sequence)
120✔
570
    condition = opts[:condition]
120✔
571

572
    weight =
120✔
573
      opts[:weight] || default_weight(workflow.graph, to, edge_type)
120✔
574

575
    label = opts[:label] || condition || edge_type_label(edge_type)
120✔
576

577
    meta = %{
120✔
578
      label: label,
579
      condition: condition,
580
      edge_type: edge_type,
581
      weight: weight
582
    }
583

584
    {graph, edge_id} = Yog.Multi.add_edge(workflow.graph, from, to, weight)
120✔
585
    edge_meta = Map.put(workflow.edge_meta, edge_id, meta)
120✔
586

587
    %{workflow | graph: graph, edge_meta: edge_meta}
120✔
588
  end
589

590
  # ============================================================================
591
  # Swimlanes
592
  # ============================================================================
593

594
  @doc """
595
  Adds a swimlane grouping.
596

597
  Swimlanes are rendered as subgraph clusters. Nodes can be assigned to a
598
  swimlane via the `:swimlane` option in node builders.
599

600
  ## Examples
601

602
      iex> workflow = Choreo.Workflow.new()
603
      iex> workflow = workflow
604
      ...>   |> Choreo.Workflow.add_swimlane("backend", label: "Backend Services")
605
      ...>   |> Choreo.Workflow.add_task(:api, swimlane: "backend")
606
      iex> Map.get(workflow.graph.nodes, :api)[:cluster]
607
      "cluster_backend"
608
  """
609
  @spec add_swimlane(t(), String.t() | atom(), keyword()) :: t()
610
  def add_swimlane(%__MODULE__{} = workflow, name, opts \\ []) do
611
    opts = NimbleOptions.validate!(opts, @add_swimlane_schema)
3✔
612
    name = Choreo.Internal.ensure_cluster_prefix(name)
2✔
613
    clusters = Map.put(workflow.clusters || %{}, name, Map.new(opts))
2✔
614
    %{workflow | clusters: clusters}
2✔
615
  end
616

617
  # ============================================================================
618
  # Rendering
619
  # ============================================================================
620

621
  @doc """
622
  Renders the workflow to DOT format.
623

624
  ## Options
625

626
    * `:theme` — `:default`, `:dark`, or a `Choreo.Theme` struct
627

628
  ## Examples
629

630
      iex> workflow = Choreo.Workflow.new()
631
      iex> workflow = workflow
632
      ...>   |> Choreo.Workflow.add_start(:start)
633
      ...>   |> Choreo.Workflow.add_task(:process)
634
      ...>   |> Choreo.Workflow.add_end(:end)
635
      ...>   |> Choreo.Workflow.connect(:start, :process)
636
      ...>   |> Choreo.Workflow.connect(:process, :end)
637
      iex> dot = Choreo.Workflow.to_dot(workflow)
638
      iex> String.contains?(dot, "digraph")
639
      true
640
      iex> String.contains?(dot, "process")
641
      true
642
  """
643
  @spec to_dot(t(), keyword()) :: String.t()
644
  def to_dot(%__MODULE__{} = workflow, opts \\ []) do
645
    Choreo.Workflow.Render.DOT.to_dot(workflow, opts)
3✔
646
  end
647

648
  # ============================================================================
649
  # Queries
650
  # ============================================================================
651

652
  @doc """
653
  Returns all node IDs in the workflow.
654

655
  ## Examples
656

657
      iex> workflow = Choreo.Workflow.new()
658
      iex> workflow = workflow
659
      ...>   |> Choreo.Workflow.add_start(:a)
660
      ...>   |> Choreo.Workflow.add_task(:b)
661
      iex> Enum.sort(Choreo.Workflow.nodes(workflow))
662
      [:a, :b]
663
  """
664
  @spec nodes(t()) :: [Yog.node_id()]
665
  def nodes(%__MODULE__{graph: graph}) do
666
    Map.keys(graph.nodes)
26✔
667
  end
668

669
  @doc """
670
  Returns all edges as `{from, to, weight}` tuples.
671

672
  ## Examples
673

674
      iex> workflow = Choreo.Workflow.new()
675
      iex> workflow = workflow
676
      ...>   |> Choreo.Workflow.add_start(:a)
677
      ...>   |> Choreo.Workflow.add_task(:b)
678
      ...>   |> Choreo.Workflow.connect(:a, :b)
679
      iex> Choreo.Workflow.edges(workflow)
680
      [{:a, :b, 1}]
681
  """
682
  @spec edges(t()) :: [{Yog.node_id(), Yog.node_id(), number()}]
683
  def edges(%__MODULE__{graph: graph}) do
684
    Enum.map(graph.edges, fn {_edge_id, {from, to, weight}} ->
4✔
685
      {from, to, weight}
4✔
686
    end)
687
  end
688

689
  @doc """
690
  Returns all edges with their metadata as `{from, to, weight, meta}` tuples.
691
  """
692
  @spec edges_with_meta(t()) :: [{Yog.node_id(), Yog.node_id(), number(), map()}]
693
  def edges_with_meta(%__MODULE__{graph: graph, edge_meta: edge_meta}) do
694
    Enum.map(graph.edges, fn {edge_id, {from, to, weight}} ->
5✔
695
      {from, to, weight, Map.get(edge_meta, edge_id, %{})}
6✔
696
    end)
697
  end
698

699
  @doc """
700
  Collapses parallel edges into a simple Graph for algorithm analysis.
701
  """
702
  @spec to_simple_graph(t(), keyword()) :: Yog.Graph.t()
703
  def to_simple_graph(%__MODULE__{graph: graph}, opts \\ []) do
704
    combine = Keyword.get(opts, :combine, &min/2)
53✔
705
    Yog.Multi.to_simple_graph(graph, combine)
53✔
706
  end
707

708
  @doc """
709
  Returns all task node IDs.
710

711
  ## Examples
712

713
      iex> workflow = Choreo.Workflow.new()
714
      iex> workflow = workflow
715
      ...>   |> Choreo.Workflow.add_task(:a)
716
      ...>   |> Choreo.Workflow.add_task(:b)
717
      ...>   |> Choreo.Workflow.add_start(:s)
718
      iex> Enum.sort(Choreo.Workflow.tasks(workflow))
719
      [:a, :b]
720
  """
721
  @spec tasks(t()) :: [Yog.node_id()]
722
  def tasks(%__MODULE__{graph: graph}) do
723
    graph.nodes
3✔
724
    |> Enum.filter(fn {_id, data} -> data[:node_type] == :task end)
5✔
725
    |> Enum.map(fn {id, _data} -> id end)
3✔
726
  end
727

728
  @doc """
729
  Returns all start node IDs.
730

731
  ## Examples
732

733
      iex> workflow = Choreo.Workflow.new()
734
      iex> workflow = workflow
735
      ...>   |> Choreo.Workflow.add_start(:a)
736
      ...>   |> Choreo.Workflow.add_start(:b)
737
      iex> Enum.sort(Choreo.Workflow.starts(workflow))
738
      [:a, :b]
739
  """
740
  @spec starts(t()) :: [Yog.node_id()]
741
  def starts(%__MODULE__{graph: graph}) do
742
    graph.nodes
34✔
743
    |> Enum.filter(fn {_id, data} -> data[:node_type] == :start end)
115✔
744
    |> Enum.map(fn {id, _data} -> id end)
34✔
745
  end
746

747
  @doc """
748
  Returns all end node IDs.
749

750
  ## Examples
751

752
      iex> workflow = Choreo.Workflow.new()
753
      iex> workflow = workflow
754
      ...>   |> Choreo.Workflow.add_end(:a)
755
      ...>   |> Choreo.Workflow.add_end(:b)
756
      iex> Enum.sort(Choreo.Workflow.ends(workflow))
757
      [:a, :b]
758
  """
759
  @spec ends(t()) :: [Yog.node_id()]
760
  def ends(%__MODULE__{graph: graph}) do
761
    graph.nodes
32✔
762
    |> Enum.filter(fn {_id, data} -> data[:node_type] == :end end)
106✔
763
    |> Enum.map(fn {id, _data} -> id end)
32✔
764
  end
765

766
  @doc """
767
  Returns all compensation node IDs.
768

769
  ## Examples
770

771
      iex> workflow = Choreo.Workflow.new()
772
      iex> workflow = workflow
773
      ...>   |> Choreo.Workflow.add_compensation(:rollback, for: :task_a)
774
      ...>   |> Choreo.Workflow.add_compensation(:undo, for: :task_b)
775
      iex> Enum.sort(Choreo.Workflow.compensations(workflow))
776
      [:rollback, :undo]
777
  """
778
  @spec compensations(t()) :: [Yog.node_id()]
779
  def compensations(%__MODULE__{graph: graph}) do
780
    graph.nodes
13✔
781
    |> Enum.filter(fn {_id, data} -> data[:node_type] == :compensation end)
38✔
782
    |> Enum.map(fn {id, _data} -> id end)
13✔
783
  end
784

785
  @doc """
786
  Returns the raw `Yog.Graph` struct underpinning the workflow.
787

788
  ## Examples
789

790
      iex> workflow = Choreo.Workflow.new()
791
      iex> graph = Choreo.Workflow.to_graph(workflow)
792
      iex> graph.kind
793
      :directed
794
  """
795
  @spec to_graph(t()) :: Yog.graph()
796
  def to_graph(%__MODULE__{graph: graph}), do: graph
1✔
797

798
  # ============================================================================
799
  # Private helpers
800
  # ============================================================================
801

802
  defp add_typed_node(%__MODULE__{graph: graph} = workflow, id, type, opts) do
803
    data = %{
181✔
804
      type: :workflow_node,
805
      node_type: type,
806
      label: Keyword.get(opts, :label, to_string(id)),
181✔
807
      timeout_ms: opts[:timeout_ms],
808
      retry: opts[:retry],
809
      retry_backoff_ms: opts[:retry_backoff_ms],
810
      handler: opts[:handler],
811
      description: opts[:description]
812
    }
813

814
    data = put_swimlane(data, opts[:swimlane])
181✔
815
    %{workflow | graph: Yog.Multi.add_node(graph, id, data)}
181✔
816
  end
817

818
  defp put_swimlane(data, nil), do: data
196✔
819

820
  defp put_swimlane(data, swimlane),
821
    do: Map.put(data, :cluster, Choreo.Internal.ensure_cluster_prefix(swimlane))
2✔
822

823
  defp default_weight(_graph, _to, :compensation), do: 0
12✔
824
  defp default_weight(_graph, _to, :retry), do: 0
×
825
  defp default_weight(_graph, _to, :failure), do: 0
×
826
  defp default_weight(_graph, _to, :timeout), do: 0
×
827

828
  defp default_weight(graph, to, _edge_type) do
829
    case Map.get(graph.nodes, to) do
108✔
830
      %{timeout_ms: ms} when is_number(ms) and ms > 0 -> ms
28✔
831
      _ -> 1
80✔
832
    end
833
  end
834

835
  defp edge_type_label(:sequence), do: nil
97✔
836
  defp edge_type_label(:compensation), do: "compensate"
7✔
837
  defp edge_type_label(:retry), do: "retry"
×
838
  defp edge_type_label(:failure), do: "failure"
×
839
  defp edge_type_label(:timeout), do: "timeout"
×
840
  defp edge_type_label(_), do: nil
3✔
841
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc