• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #37433

pending completion
#37433

push

local

web-flow
Merge pull request #48513 from JuliaLang/jn/extend-once

ensure extension triggers are only run by the package that satified them

60 of 60 new or added lines in 1 file covered. (100.0%)

72324 of 82360 relevant lines covered (87.81%)

31376331.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.18
/stdlib/Distributed/src/pmap.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
struct BatchProcessingError <: Exception
4
    data
249✔
5
    ex
6
end
7

8
"""
9
    pgenerate([::WorkerPool], f, c...) -> iterator
10

11
Apply `f` to each element of `c` in parallel using available workers and tasks.
12

13
For multiple collection arguments, apply `f` elementwise.
14

15
Results are returned in order as they become available.
16

17
Note that `f` must be made available to all worker processes; see
18
[Code Availability and Loading Packages](@ref code-availability)
19
for details.
20
"""
21
function pgenerate(p::WorkerPool, f, c)
1✔
22
    if length(p) == 0
1✔
23
        return AsyncGenerator(f, c; ntasks=()->nworkers(p))
×
24
    end
25
    batches = batchsplit(c, min_batch_count = length(p) * 3)
1✔
26
    return Iterators.flatten(AsyncGenerator(remote(p, b -> asyncmap(f, b)), batches))
11✔
27
end
28
pgenerate(p::WorkerPool, f, c1, c...) = pgenerate(p, a->f(a...), zip(c1, c...))
×
29
pgenerate(f, c) = pgenerate(default_worker_pool(), f, c)
1✔
30
pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...))
×
31

32
"""
33
    pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
34

35
Transform collection `c` by applying `f` to each element using available
36
workers and tasks.
37

38
For multiple collection arguments, apply `f` elementwise.
39

40
Note that `f` must be made available to all worker processes; see
41
[Code Availability and Loading Packages](@ref code-availability) for details.
42

43
If a worker pool is not specified, all available workers, i.e., the default worker pool
44
is used.
45

46
By default, `pmap` distributes the computation over all specified workers. To use only the
47
local process and distribute over tasks, specify `distributed=false`.
48
This is equivalent to using [`asyncmap`](@ref). For example,
49
`pmap(f, c; distributed=false)` is equivalent to `asyncmap(f,c; ntasks=()->nworkers())`
50

51
`pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes
52
greater than 1, the collection is processed in multiple batches, each of length `batch_size` or less.
53
A batch is sent as a single request to a free worker, where a local [`asyncmap`](@ref) processes
54
elements from the batch using multiple concurrent tasks.
55

56
Any error stops `pmap` from processing the remainder of the collection. To override this behavior
57
you can specify an error handling function via argument `on_error` which takes in a single argument, i.e.,
58
the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value
59
which is then returned inline with the results to the caller.
60

61
Consider the following two examples. The first one returns the exception object inline,
62
the second a 0 in place of any exception:
63
```julia-repl
64
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
65
4-element Array{Any,1}:
66
 1
67
  ErrorException("foo")
68
 3
69
  ErrorException("foo")
70

71
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
72
4-element Array{Int64,1}:
73
 1
74
 0
75
 3
76
 0
77
```
78

79
Errors can also be handled by retrying failed computations. Keyword arguments `retry_delays` and
80
`retry_check` are passed through to [`retry`](@ref) as keyword arguments `delays` and `check`
81
respectively. If batching is specified, and an entire batch fails, all items in
82
the batch are retried.
83

84
Note that if both `on_error` and `retry_delays` are specified, the `on_error` hook is called
85
before retrying. If `on_error` does not throw (or rethrow) an exception, the element will not
86
be retried.
87

88
Example: On errors, retry `f` on an element a maximum of 3 times without any delay between retries.
89
```julia
90
pmap(f, c; retry_delays = zeros(3))
91
```
92

93
Example: Retry `f` only if the exception is not of type [`InexactError`](@ref), with exponentially increasing
94
delays up to 3 times. Return a `NaN` in place for all `InexactError` occurrences.
95
```julia
96
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
97
```
98
"""
99
function pmap(f, p::AbstractWorkerPool, c; distributed=true, batch_size=1, on_error=nothing,
200✔
100
                                           retry_delays=[], retry_check=nothing)
101
    f_orig = f
100✔
102
    # Don't do remote calls if there are no workers.
103
    if (length(p) == 0) || (length(p) == 1 && fetch(p.channel) == myid())
199✔
104
        distributed = false
46✔
105
    end
106

107
    # Don't do batching if not doing remote calls.
108
    if !distributed
100✔
109
        batch_size = 1
62✔
110
    end
111

112
    # If not batching, do simple remote call.
113
    if batch_size == 1
100✔
114
        if on_error !== nothing
92✔
115
            f = wrap_on_error(f, on_error)
14✔
116
        end
117

118
        if distributed
92✔
119
            f = remote(p, f)
30✔
120
        end
121

122
        if length(retry_delays) > 0
92✔
123
            f = wrap_retry(f, retry_delays, retry_check)
16✔
124
        end
125

126
        return asyncmap(f, c; ntasks=()->nworkers(p))
4,300✔
127
    else
128
        # During batch processing, We need to ensure that if on_error is set, it is called
129
        # for each element in error, and that we return as many elements as the original list.
130
        # retry, if set, has to be called element wise and we will do a best-effort
131
        # to ensure that we do not call mapped function on the same element more than length(retry_delays).
132
        # This guarantee is not possible in case of worker death / network errors, wherein
133
        # we will retry the entire batch on a new worker.
134

135
        handle_errors = ((on_error !== nothing) || (length(retry_delays) > 0))
8✔
136

137
        # Unlike the non-batch case, in batch mode, we trap all errors and the on_error hook (if present)
138
        # is processed later in non-batch mode.
139
        if handle_errors
8✔
140
            f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true)
255✔
141
        end
142

143
        f = wrap_batch(f, p, handle_errors)
8✔
144
        results = asyncmap(f, c; ntasks=()->nworkers(p), batch_size=batch_size)
816✔
145

146
        # process errors if any.
147
        if handle_errors
8✔
148
            process_batch_errors!(p, f_orig, results, on_error, retry_delays, retry_check)
6✔
149
        end
150

151
        return results
8✔
152
    end
153
end
154

155
pmap(f, p::AbstractWorkerPool, c1, c...; kwargs...) = pmap(a->f(a...), p, zip(c1, c...); kwargs...)
28✔
156
pmap(f, c; kwargs...) = pmap(f, default_worker_pool(), c; kwargs...)
93✔
157
pmap(f, c1, c...; kwargs...) = pmap(a->f(a...), zip(c1, c...); kwargs...)
15✔
158

159
function wrap_on_error(f, on_error; capture_data=false)
100✔
160
    return x -> begin
1,956✔
161
        try
1,906✔
162
            f(x)
1,906✔
163
        catch e
164
            if capture_data
564✔
165
                on_error(x, e)
249✔
166
            else
167
                on_error(e)
315✔
168
            end
169
        end
170
    end
171
end
172

173
function wrap_retry(f, retry_delays, retry_check)
16✔
174
    retry(delays=retry_delays, check=retry_check) do x
16✔
175
        try
1,376✔
176
            f(x)
1,376✔
177
        catch e
178
            rethrow(extract_exception(e))
27✔
179
        end
180
    end
181
end
182

183
function wrap_batch(f, p, handle_errors)
8✔
184
    f = asyncmap_batch(f)
8✔
185
    return batch -> begin
412✔
186
        try
404✔
187
            remotecall_fetch(f, p, batch)
404✔
188
        catch e
189
            if handle_errors
×
190
                return Any[BatchProcessingError(b, e) for b in batch]
×
191
            else
192
                rethrow()
×
193
            end
194
        end
195
    end
196
end
197

198
asyncmap_batch(f) = batch -> asyncmap(x->f(x...), batch)
1,212✔
199
extract_exception(e) = isa(e, RemoteException) ? e.captured.ex : e
27✔
200

201

202
function process_batch_errors!(p, f, results, on_error, retry_delays, retry_check)
6✔
203
    # Handle all the ones in error in another pmap, with batch size set to 1
204
    reprocess = Tuple{Int,BatchProcessingError}[]
6✔
205
    for (idx, v) in enumerate(results)
12✔
206
        if isa(v, BatchProcessingError)
600✔
207
            push!(reprocess, (idx,v))
249✔
208
        end
209
    end
1,194✔
210

211
    if length(reprocess) > 0
6✔
212
        errors = [x[2] for x in reprocess]
6✔
213
        exceptions = Any[x.ex for x in errors]
12✔
214
        state = iterate(retry_delays)
8✔
215
        state !== nothing && (state = state[2])
6✔
216
        error_processed = let state=state
6✔
217
            if (length(retry_delays)::Int > 0) &&
6✔
218
                    (retry_check === nothing || all([retry_check(state,ex)[2] for ex in exceptions]))
219
                # BatchProcessingError.data is a tuple of original args
220
                pmap(x->f(x...), p, Any[x.data for x in errors];
161✔
221
                        on_error = on_error, retry_delays = collect(retry_delays)[2:end::Int], retry_check = retry_check)
222
            elseif on_error !== nothing
2✔
223
                map(on_error, exceptions)
2✔
224
            else
225
                throw(CompositeException(exceptions))
4✔
226
            end
227
        end
228

229
        for (idx, v) in enumerate(error_processed)
12✔
230
            results[reprocess[idx][1]] = v
249✔
231
        end
492✔
232
    end
233
    nothing
6✔
234
end
235

236
"""
237
    head_and_tail(c, n) -> head, tail
238

239
Return `head`: the first `n` elements of `c`;
240
and `tail`: an iterator over the remaining elements.
241

242
```jldoctest
243
julia> b, c = Distributed.head_and_tail(1:10, 3)
244
([1, 2, 3], Base.Iterators.Rest{UnitRange{Int64}, Int64}(1:10, 3))
245

246
julia> collect(c)
247
7-element Vector{Int64}:
248
  4
249
  5
250
  6
251
  7
252
  8
253
  9
254
 10
255
```
256
"""
257
function head_and_tail(c, n)
7✔
258
    head = Vector{eltype(c)}(undef, n)
7✔
259
    n == 0 && return (head, c)
7✔
260
    i = 1
5✔
261
    y = iterate(c)
10✔
262
    y === nothing && return (resize!(head, 0), ())
5✔
263
    head[i] = y[1]
4✔
264
    while i < n
10✔
265
        y = iterate(c, y[2])
14✔
266
        y === nothing && return (resize!(head, i), ())
8✔
267
        i += 1
6✔
268
        head[i] = y[1]
6✔
269
    end
6✔
270
    return head, Iterators.rest(c, y[2])
2✔
271
end
272

273
"""
274
    batchsplit(c; min_batch_count=1, max_batch_size=100) -> iterator
275

276
Split a collection into at least `min_batch_count` batches.
277

278
Equivalent to `partition(c, max_batch_size)` when `length(c) >> max_batch_size`.
279
"""
280
function batchsplit(c; min_batch_count=1, max_batch_size=100)
2✔
281
    if min_batch_count < 1
1✔
282
        throw(ArgumentError("min_batch_count must be ≥ 1, got $min_batch_count"))
×
283
    end
284

285
    if max_batch_size < 1
1✔
286
        throw(ArgumentError("max_batch_size must be ≥ 1, got $max_batch_size"))
×
287
    end
288

289
    # Split collection into batches, then peek at the first few batches
290
    batches = Iterators.partition(c, max_batch_size)
1✔
291
    head, tail = head_and_tail(batches, min_batch_count)
1✔
292

293
    # If there are not enough batches, use a smaller batch size
294
    if length(head) < min_batch_count
1✔
295
        batch_size = max(1, div(sum(length, head), min_batch_count))
1✔
296
        return Iterators.partition(collect(Iterators.flatten(head)), batch_size)
1✔
297
    end
298

299
    return Iterators.flatten((head, tail))
×
300
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc