• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
No new info detected.

JuliaLang / julia / #37474

pending completion
#37474

push

local

web-flow
irinterp: Allow setting all IR flags (#48993)

Currently, `IR_FLAG_NOTHROW` is the only flag that irinterp is allowed to
set on statements, under the assumption that in order for a call to
be irinterp-eligible, it must have been proven `:foldable`, thus `:effect_free`,
and thus `IR_FLAG_EFFECT_FREE` was assumed to have been set. That reasoning
was sound at the time this code was written, but have since introduced
`EFFECT_FREE_IF_INACCESSIBLEMEMONLY`, which breaks the reasoning that
an `:effect_free` inference for the whole function implies the flag on
every statement. As a result, we were failing to DCE otherwise dead
statements if the IR came from irinterp.

3 of 3 new or added lines in 1 file covered. (100.0%)

70258 of 82316 relevant lines covered (85.35%)

32461773.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.21
/base/asyncmap.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
using Base.Iterators: Enumerate
4

5
"""
6
    asyncmap(f, c...; ntasks=0, batch_size=nothing)
7

8
Uses multiple concurrent tasks to map `f` over a collection (or multiple
9
equal length collections). For multiple collection arguments, `f` is
10
applied elementwise.
11

12
`ntasks` specifies the number of tasks to run concurrently.
13
Depending on the length of the collections, if `ntasks` is unspecified,
14
up to 100 tasks will be used for concurrent mapping.
15

16
`ntasks` can also be specified as a zero-arg function. In this case, the
17
number of tasks to run in parallel is checked before processing every element and a new
18
task started if the value of `ntasks_func` is greater than the current number
19
of tasks.
20

21
If `batch_size` is specified, the collection is processed in batch mode. `f` must
22
then be a function that must accept a `Vector` of argument tuples and must
23
return a vector of results. The input vector will have a length of `batch_size` or less.
24

25
The following examples highlight execution in different tasks by returning
26
the `objectid` of the tasks in which the mapping function is executed.
27

28
First, with `ntasks` undefined, each element is processed in a different task.
29
```
30
julia> tskoid() = objectid(current_task());
31

32
julia> asyncmap(x->tskoid(), 1:5)
33
5-element Array{UInt64,1}:
34
 0x6e15e66c75c75853
35
 0x440f8819a1baa682
36
 0x9fb3eeadd0c83985
37
 0xebd3e35fe90d4050
38
 0x29efc93edce2b961
39

40
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
41
5
42
```
43

44
With `ntasks=2` all elements are processed in 2 tasks.
45
```
46
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
47
5-element Array{UInt64,1}:
48
 0x027ab1680df7ae94
49
 0xa23d2f80cd7cf157
50
 0x027ab1680df7ae94
51
 0xa23d2f80cd7cf157
52
 0x027ab1680df7ae94
53

54
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
55
2
56
```
57

58
With `batch_size` defined, the mapping function needs to be changed to accept an array
59
of argument tuples and return an array of results. `map` is used in the modified mapping
60
function to achieve this.
61
```
62
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
63
batch_func (generic function with 1 method)
64

65
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
66
5-element Array{String,1}:
67
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
68
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
69
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
70
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
71
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
72
```
73

74
!!! note
75
    Currently, all tasks in Julia are executed in a single OS thread co-operatively. Consequently,
76
    `asyncmap` is beneficial only when the mapping function involves any I/O - disk, network, remote
77
    worker invocation, etc.
78

79
"""
80
function asyncmap(f, c...; ntasks=0, batch_size=nothing)
1,166✔
81
    return async_usemap(f, c...; ntasks=ntasks, batch_size=batch_size)
583✔
82
end
83

84
function async_usemap(f, c...; ntasks=0, batch_size=nothing)
1,174✔
85
    ntasks = verify_ntasks(c[1], ntasks)
587✔
86
    batch_size = verify_batch_size(batch_size)
586✔
87

88
    if batch_size !== nothing
584✔
89
        exec_func = batch -> begin
437✔
90
            # extract the Refs from the input tuple
91
            batch_refs = map(x->x[1], batch)
1,349✔
92

93
            # and the args tuple....
94
            batched_args = map(x->x[2], batch)
1,349✔
95

96
            results = f(batched_args)
427✔
97
            foreach(x -> (batch_refs[x[1]].x = x[2]), enumerate(results))
1,349✔
98
        end
99
    else
100
        exec_func = (r,args) -> (r.x = f(args...))
6,684✔
101
    end
102
    chnl, worker_tasks = setup_chnl_and_tasks(exec_func, ntasks, batch_size)
584✔
103
    return wrap_n_exec_twice(chnl, worker_tasks, ntasks, exec_func, c...)
584✔
104
end
105

106
batch_size_err_str(batch_size) = string("batch_size must be specified as a positive integer. batch_size=", batch_size)
2✔
107
function verify_batch_size(batch_size)
591✔
108
    if batch_size === nothing
591✔
109
        return batch_size
579✔
110
    elseif isa(batch_size, Number)
12✔
111
        batch_size = Int(batch_size)
11✔
112
        batch_size < 1 && throw(ArgumentError(batch_size_err_str(batch_size)))
11✔
113
        return batch_size
10✔
114
    else
115
        throw(ArgumentError(batch_size_err_str(batch_size)))
1✔
116
    end
117
end
118

119

120
function verify_ntasks(iterable, ntasks)
592✔
121
    if !((isa(ntasks, Number) && (ntasks >= 0)) || isa(ntasks, Function))
592✔
122
        err = string("ntasks must be specified as a positive integer or a 0-arg function. ntasks=", ntasks)
1✔
123
        throw(ArgumentError(err))
1✔
124
    end
125

126
    if ntasks == 0
591✔
127
        if haslength(iterable)
487✔
128
            ntasks = max(1,min(100, length(iterable)))
485✔
129
        else
130
            ntasks = 100
2✔
131
        end
132
    end
133
    return ntasks
591✔
134
end
135

136
function wrap_n_exec_twice(chnl, worker_tasks, ntasks, exec_func, c...)
584✔
137
    # The driver task, creates a Ref object and writes it and the args tuple to
138
    # the communication channel for processing by a free worker task.
139
    push_arg_to_channel = (x...) -> (r=Ref{Any}(nothing); put!(chnl,(r,x));r)
28,708✔
140

141
    if isa(ntasks, Function)
584✔
142
        map_f = (x...) -> begin
5,208✔
143
            # check number of tasks every time, and start one if required.
144
            # number_tasks > optimal_number is fine, the other way around is inefficient.
145
            if length(worker_tasks) < ntasks()
5,108✔
146
                start_worker_task!(worker_tasks, exec_func, chnl)
7✔
147
            end
148
            push_arg_to_channel(x...)
5,108✔
149
        end
150
    else
151
        map_f = push_arg_to_channel
484✔
152
    end
153
    maptwice(map_f, chnl, worker_tasks, c...)
584✔
154
end
155

156
function maptwice(wrapped_f, chnl, worker_tasks, c...)
584✔
157
    # first run, returns a collection of Refs
158
    asyncrun_excp = nothing
584✔
159
    local asyncrun
×
160
    try
584✔
161
        asyncrun = map(wrapped_f, c...)
584✔
162
    catch ex
163
        if isa(ex,InvalidStateException)
×
164
            # channel could be closed due to exceptions in the async tasks,
165
            # we propagate those errors, if any, over the `put!` failing
166
            # in asyncrun due to a closed channel.
167
            asyncrun_excp = ex
×
168
        else
169
            rethrow()
×
170
        end
171
    end
172

173
    # close channel and wait for all worker tasks to finish
174
    close(chnl)
584✔
175

176
    # check and throw any exceptions from the worker tasks
177
    foreach(x->(v=fetch(x); isa(v, Exception) && throw(v)), worker_tasks)
4,938✔
178

179
    # check if there was a genuine problem with asyncrun
180
    (asyncrun_excp !== nothing) && throw(asyncrun_excp)
581✔
181

182
    if isa(asyncrun, Ref)
581✔
183
        # scalar case
184
        return asyncrun.x
1✔
185
    else
186
        # second run, extract values from the Refs and return
187
        return map(ref->ref.x, asyncrun)
7,505✔
188
    end
189
end
190

191
function setup_chnl_and_tasks(exec_func, ntasks, batch_size=nothing)
589✔
192
    if isa(ntasks, Function)
589✔
193
        nt = ntasks()::Int
101✔
194
        # start at least one worker task.
195
        if nt == 0
101✔
196
            nt = 1
1✔
197
        end
198
    else
199
        nt = ntasks::Int
488✔
200
    end
201

202
    # Use an unbuffered channel for communicating with the worker tasks. In the event
203
    # of an error in any of the worker tasks, the channel is closed. This
204
    # results in the `put!` in the driver task failing immediately.
205
    chnl = Channel(0)
589✔
206
    worker_tasks = []
589✔
207
    foreach(_ -> start_worker_task!(worker_tasks, exec_func, chnl, batch_size), 1:nt)
2,827✔
208
    yield()
589✔
209
    return (chnl, worker_tasks)
589✔
210
end
211

212
function start_worker_task!(worker_tasks, exec_func, chnl, batch_size=nothing)
2,252✔
213
    t = @async begin
2,252✔
214
        retval = nothing
2,245✔
215

216
        try
2,245✔
217
            if isa(batch_size, Number)
2,245✔
218
                while isopen(chnl)
477✔
219
                    # The mapping function expects an array of input args, as it processes
220
                    # elements in a batch.
221
                    batch_collection=Any[]
435✔
222
                    n = 0
435✔
223
                    for exec_data in chnl
435✔
224
                        push!(batch_collection, exec_data)
922✔
225
                        n += 1
922✔
226
                        (n == batch_size) && break
922✔
227
                    end
502✔
228
                    if n > 0
435✔
229
                        exec_func(batch_collection)
427✔
230
                    end
231
                end
435✔
232
            else
233
                for exec_data in chnl
2,203✔
234
                    exec_func(exec_data...)
6,164✔
235
                end
6,206✔
236
            end
237
        catch e
238
            close(chnl)
3✔
239
            retval = capture_exception(e, catch_backtrace())
3✔
240
        end
241
        retval
2,245✔
242
    end
243
    push!(worker_tasks, t)
2,245✔
244
end
245

246
# Special handling for some types.
247
function asyncmap(f, s::AbstractString; kwargs...)
4✔
248
    s2 = Vector{Char}(undef, length(s))
2✔
249
    asyncmap!(f, s2, s; kwargs...)
2✔
250
    return String(s2)
2✔
251
end
252

253
# map on a single BitArray returns a BitArray if the mapping function is boolean.
254
function asyncmap(f, b::BitArray; kwargs...)
8✔
255
    b2 = async_usemap(f, b; kwargs...)
4✔
256
    if eltype(b2) == Bool
4✔
257
        return BitArray(b2)
2✔
258
    end
259
    return b2
2✔
260
end
261

262
mutable struct AsyncCollector
263
    f
264
    results
265
    enumerator::Enumerate
266
    ntasks
267
    batch_size
268
    nt_check::Bool     # check number of tasks on every iteration
269

270
    AsyncCollector(f, r, en::Enumerate, ntasks, batch_size) = new(f, r, en, ntasks, batch_size, isa(ntasks, Function))
5✔
271
end
272

273
"""
274
    AsyncCollector(f, results, c...; ntasks=0, batch_size=nothing) -> iterator
275

276
Return an iterator which applies `f` to each element of `c` asynchronously
277
and collects output into `results`.
278

279
Keyword args `ntasks` and `batch_size` have the same behavior as in
280
[`asyncmap`](@ref). If `batch_size` is specified, `f` must
281
be a function which operates on an array of argument tuples.
282

283
!!! note
284
    `iterate(::AsyncCollector, state) -> (nothing, state)`. A successful return
285
    from `iterate` indicates that the next element from the input collection is
286
    being processed asynchronously. It blocks until a free worker task becomes
287
    available.
288

289
!!! note
290
    `for _ in AsyncCollector(f, results, c...; ntasks=1) end` is equivalent to
291
    `map!(f, results, c...)`.
292
"""
293
function AsyncCollector(f, results, c...; ntasks=0, batch_size=nothing)
10✔
294
    AsyncCollector(f, results, enumerate(zip(c...)), ntasks, batch_size)
5✔
295
end
296

297
mutable struct AsyncCollectorState
298
    chnl::Channel
299
    worker_tasks::Array{Task,1}
300
    enum_state      # enumerator state
301
    AsyncCollectorState(chnl::Channel, worker_tasks::Vector) =
5✔
302
        new(chnl, convert(Vector{Task}, worker_tasks))
303
end
304

305
function iterate(itr::AsyncCollector)
5✔
306
    itr.ntasks = verify_ntasks(itr.enumerator, itr.ntasks)
5✔
307
    itr.batch_size = verify_batch_size(itr.batch_size)
5✔
308

309
    chnl, worker_tasks = setup_chnl_and_tasks((i,args) -> (itr.results[i]=itr.f(args...)), itr.ntasks, itr.batch_size)
59✔
310
    return iterate(itr, AsyncCollectorState(chnl, worker_tasks))
5✔
311
end
312

313
function wait_done(itr::AsyncCollector, state::AsyncCollectorState)
5✔
314
    close(state.chnl)
5✔
315

316
    # wait for all tasks to finish
317
    foreach(x->(v=fetch(x); isa(v, Exception) && throw(v)), state.worker_tasks)
97✔
318
    empty!(state.worker_tasks)
5✔
319
end
320

321
function iterate(itr::AsyncCollector, state::AsyncCollectorState)
59✔
322
    if itr.nt_check && (length(state.worker_tasks) < itr.ntasks())
59✔
323
        start_worker_task!(state.worker_tasks, itr.f, state.chnl)
×
324
    end
325

326
    # Get index and mapped function arguments from enumeration iterator.
327
    y = isdefined(state, :enum_state) ?
64✔
328
        iterate(itr.enumerator, state.enum_state) :
329
        iterate(itr.enumerator)
330
    if y === nothing
59✔
331
        wait_done(itr, state)
5✔
332
        return nothing
5✔
333
    end
334
    (i, args), state.enum_state = y
54✔
335
    put!(state.chnl, (i, args))
54✔
336

337
    return (nothing, state)
54✔
338
end
339

340
"""
341
    AsyncGenerator(f, c...; ntasks=0, batch_size=nothing) -> iterator
342

343
Apply `f` to each element of `c` using at most `ntasks` asynchronous tasks.
344

345
Keyword args `ntasks` and `batch_size` have the same behavior as in
346
[`asyncmap`](@ref). If `batch_size` is specified, `f` must
347
be a function which operates on an array of argument tuples.
348

349
!!! note
350
    `collect(AsyncGenerator(f, c...; ntasks=1))` is equivalent to
351
    `map(f, c...)`.
352
"""
353
mutable struct AsyncGenerator
354
    collector::AsyncCollector
1✔
355
end
356

357
function AsyncGenerator(f, c...; ntasks=0)
2✔
358
    AsyncGenerator(AsyncCollector(f, Dict{Int,Any}(), c...; ntasks=ntasks))
1✔
359
end
360

361
mutable struct AsyncGeneratorState
362
    i::Int
363
    collector_done::Bool
364
    collector_state::AsyncCollectorState
365
    AsyncGeneratorState(i::Int) = new(i, false)
1✔
366
end
367

368
function iterate(itr::AsyncGenerator, state::AsyncGeneratorState=AsyncGeneratorState(0))
12✔
369
    state.i += 1
12✔
370

371
    results_dict = itr.collector.results
11✔
372
    while !state.collector_done && !haskey(results_dict, state.i)
21✔
373
        y = isdefined(state, :collector_state) ?
12✔
374
            iterate(itr.collector, state.collector_state) :
375
            iterate(itr.collector)
376
        if y === nothing
11✔
377
            # `check_done` waits for async tasks to finish. if we do not have the index
378
            # we are looking for, it is an error.
379
            state.collector_done = true
1✔
380
            break;
1✔
381
        end
382
        _, state.collector_state = y
10✔
383
    end
10✔
384
    state.collector_done && isempty(results_dict) && return nothing
11✔
385
    r = results_dict[state.i]
10✔
386
    delete!(results_dict, state.i)
10✔
387

388
    return (r, state)
10✔
389
end
390

391
# pass-through iterator traits to the iterable
392
# on which the mapping function is being applied
393
IteratorSize(::Type{AsyncGenerator}) = SizeUnknown()
1✔
394
IteratorEltype(::Type{AsyncGenerator}) = EltypeUnknown()
2✔
395
size(itr::AsyncGenerator) = size(itr.collector.enumerator)
×
396
length(itr::AsyncGenerator) = length(itr.collector.enumerator)
×
397

398
"""
399
    asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
400

401
Like [`asyncmap`](@ref), but stores output in `results` rather than
402
returning a collection.
403
"""
404
function asyncmap!(f, r, c1, c...; ntasks=0, batch_size=nothing)
8✔
405
    foreach(identity, AsyncCollector(f, r, c1, c...; ntasks=ntasks, batch_size=batch_size))
4✔
406
    r
4✔
407
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc