• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JuliaLang / julia / #38182

15 Aug 2025 03:55AM UTC coverage: 77.87% (-0.4%) from 78.28%
#38182

push

local

web-flow
🤖 [master] Bump the SparseArrays stdlib from 30201ab to bb5ecc0 (#59263)

Stdlib: SparseArrays
URL: https://github.com/JuliaSparse/SparseArrays.jl.git
Stdlib branch: main
Julia branch: master
Old commit: 30201ab
New commit: bb5ecc0
Julia version: 1.13.0-DEV
SparseArrays version: 1.13.0
Bump invoked by: @ViralBShah
Powered by:
[BumpStdlibs.jl](https://github.com/JuliaLang/BumpStdlibs.jl)

Diff:
https://github.com/JuliaSparse/SparseArrays.jl/compare/30201abcb...bb5ecc091

```
$ git log --oneline 30201ab..bb5ecc0
bb5ecc0 fast quadratic form for dense matrix, sparse vectors (#640)
34ece87 Extend 3-arg `dot` to generic `HermOrSym` sparse matrices (#643)
095b685 Exclude unintended complex symmetric sparse matrices from 3-arg `dot` (#642)
8049287 Fix signature for 2-arg matrix-matrix `dot` (#641)
cff971d Make cond(::SparseMatrix, 1 / Inf) discoverable from 2-norm error (#629)
```

Co-authored-by: ViralBShah <744411+ViralBShah@users.noreply.github.com>

48274 of 61993 relevant lines covered (77.87%)

9571166.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.83
/base/asyncmap.jl
1
# This file is a part of Julia. License is MIT: https://julialang.org/license
2

3
using Base.Iterators: Enumerate
4

5
"""
6
    asyncmap(f, c...; ntasks=0, batch_size=nothing)
7

8
Uses multiple concurrent tasks to map `f` over a collection (or multiple
9
equal length collections). For multiple collection arguments, `f` is
10
applied elementwise.
11

12
The output is guaranteed to be the same order as the elements of the collection(s) `c`.
13

14
`ntasks` specifies the number of tasks to run concurrently.
15
Depending on the length of the collections, if `ntasks` is unspecified,
16
up to 100 tasks will be used for concurrent mapping.
17

18
`ntasks` can also be specified as a zero-arg function. In this case, the
19
number of tasks to run in parallel is checked before processing every element and a new
20
task started if the value of `ntasks_func` is greater than the current number
21
of tasks.
22

23
If `batch_size` is specified, the collection is processed in batch mode. `f` must
24
then be a function that must accept a `Vector` of argument tuples and must
25
return a vector of results. The input vector will have a length of `batch_size` or less.
26

27
The following examples highlight execution in different tasks by returning
28
the `objectid` of the tasks in which the mapping function is executed.
29

30
First, with `ntasks` undefined, each element is processed in a different task.
31
```julia-repl
32
julia> tskoid() = objectid(current_task());
33

34
julia> asyncmap(x->tskoid(), 1:5)
35
5-element Vector{UInt64}:
36
 0x6e15e66c75c75853
37
 0x440f8819a1baa682
38
 0x9fb3eeadd0c83985
39
 0xebd3e35fe90d4050
40
 0x29efc93edce2b961
41

42
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
43
5
44
```
45

46
With `ntasks=2` all elements are processed in 2 tasks.
47
```julia-repl
48
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
49
5-element Vector{UInt64}:
50
 0x027ab1680df7ae94
51
 0xa23d2f80cd7cf157
52
 0x027ab1680df7ae94
53
 0xa23d2f80cd7cf157
54
 0x027ab1680df7ae94
55

56
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
57
2
58
```
59

60
With `batch_size` defined, the mapping function needs to be changed to accept an array
61
of argument tuples and return an array of results. `map` is used in the modified mapping
62
function to achieve this.
63
```julia-repl
64
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
65
batch_func (generic function with 1 method)
66

67
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
68
5-element Vector{String}:
69
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
70
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
71
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
72
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
73
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
74
```
75
"""
76
function asyncmap(f, c...; ntasks=0, batch_size=nothing)
625✔
77
    return async_usemap(f, c...; ntasks=ntasks, batch_size=batch_size)
621✔
78
end
79

80
function async_usemap(f, c...; ntasks=0, batch_size=nothing)
1,085✔
81
    ntasks = verify_ntasks(c[1], ntasks)
586✔
82
    batch_size = verify_batch_size(batch_size)
585✔
83

84
    if batch_size !== nothing
583✔
85
        exec_func = batch -> begin
436✔
86
            # extract the Refs from the input tuple
87
            batch_refs = map(x->x[1], batch)
1,348✔
88

89
            # and the args tuple....
90
            batched_args = map(x->x[2], batch)
1,348✔
91

92
            results = f(batched_args)
426✔
93
            foreach(x -> (batch_refs[x[1]].x = x[2]), enumerate(results))
1,348✔
94
        end
95
    else
96
        exec_func = (r,args) -> (r.x = f(args...))
6,801✔
97
    end
98
    chnl, worker_tasks = setup_chnl_and_tasks(exec_func, ntasks, batch_size)
583✔
99
    return wrap_n_exec_twice(chnl, worker_tasks, ntasks, exec_func, c...)
583✔
100
end
101

102
batch_size_err_str(batch_size) = string("batch_size must be specified as a positive integer. batch_size=", batch_size)
2✔
103
function verify_batch_size(batch_size)
6✔
104
    if batch_size === nothing
590✔
105
        return batch_size
578✔
106
    elseif isa(batch_size, Number)
12✔
107
        batch_size = Int(batch_size)
11✔
108
        batch_size < 1 && throw(ArgumentError(batch_size_err_str(batch_size)))
11✔
109
        return batch_size
10✔
110
    else
111
        throw(ArgumentError(batch_size_err_str(batch_size)))
1✔
112
    end
113
end
114

115

116
function verify_ntasks(iterable, ntasks)
6✔
117
    if !((isa(ntasks, Number) && (ntasks >= 0)) || isa(ntasks, Function))
591✔
118
        err = string("ntasks must be specified as a positive integer or a 0-arg function. ntasks=", ntasks)
1✔
119
        throw(ArgumentError(err))
1✔
120
    end
121

122
    if ntasks == 0
590✔
123
        if haslength(iterable)
485✔
124
            ntasks = max(1,min(100, length(iterable)))
483✔
125
        else
126
            ntasks = 100
2✔
127
        end
128
    end
129
    return ntasks
590✔
130
end
131

132
function wrap_n_exec_twice(chnl, worker_tasks, ntasks, exec_func, c...)
6✔
133
    # The driver task, creates a Ref object and writes it and the args tuple to
134
    # the communication channel for processing by a free worker task.
135
    push_arg_to_channel = (x...) -> (r=Ref{Any}(nothing); put!(chnl,(r,x));r)
9,593✔
136

137
    if isa(ntasks, Function)
583✔
138
        map_f = (x...) -> begin
5,293✔
139
            # check number of tasks every time, and start one if required.
140
            # number_tasks > optimal_number is fine, the other way around is inefficient.
141
            if length(worker_tasks) < ntasks()
9,452✔
142
                start_worker_task!(worker_tasks, exec_func, chnl)
7✔
143
            end
144
            push_arg_to_channel(x...)
5,192✔
145
        end
146
    else
147
        map_f = push_arg_to_channel
482✔
148
    end
149
    maptwice(map_f, chnl, worker_tasks, c...)
583✔
150
end
151

152
function maptwice(wrapped_f, chnl, worker_tasks, c...)
583✔
153
    # first run, returns a collection of Refs
154
    asyncrun_excp = nothing
583✔
155
    local asyncrun
156
    try
583✔
157
        asyncrun = map(wrapped_f, c...)
583✔
158
    catch ex
159
        if isa(ex,InvalidStateException)
×
160
            # channel could be closed due to exceptions in the async tasks,
161
            # we propagate those errors, if any, over the `put!` failing
162
            # in asyncrun due to a closed channel.
163
            asyncrun_excp = ex
×
164
        else
165
            rethrow()
×
166
        end
167
    end
168

169
    # close channel and wait for all worker tasks to finish
170
    close(chnl)
583✔
171

172
    # check and throw any exceptions from the worker tasks
173
    foreach(x->(v=fetch(x); isa(v, Exception) && throw(v)), worker_tasks)
2,718✔
174

175
    # check if there was a genuine problem with asyncrun
176
    (asyncrun_excp !== nothing) && throw(asyncrun_excp)
580✔
177

178
    if isa(asyncrun, Ref)
580✔
179
        # scalar case
180
        return asyncrun.x
1✔
181
    else
182
        # second run, extract values from the Refs and return
183
        return map(ref->ref.x, asyncrun)
7,583✔
184
    end
185
end
186

187
function setup_chnl_and_tasks(exec_func, ntasks, batch_size=nothing)
130✔
188
    if isa(ntasks, Function)
588✔
189
        nt = ntasks()::Int
154✔
190
        # start at least one worker task.
191
        if nt == 0
102✔
192
            nt = 1
1✔
193
        end
194
    else
195
        nt = ntasks::Int
486✔
196
    end
197

198
    # Use an unbuffered channel for communicating with the worker tasks. In the event
199
    # of an error in any of the worker tasks, the channel is closed. This
200
    # results in the `put!` in the driver task failing immediately.
201
    chnl = Channel(0)
588✔
202
    worker_tasks = []
588✔
203
    foreach(_ -> start_worker_task!(worker_tasks, exec_func, chnl, batch_size), 1:nt)
2,767✔
204
    yield()
588✔
205
    return (chnl, worker_tasks)
588✔
206
end
207

208
function start_worker_task!(worker_tasks, exec_func, chnl, batch_size=nothing)
209
    t = @async begin
4,379✔
210
        retval = nothing
2,186✔
211

212
        try
2,186✔
213
            if isa(batch_size, Number)
2,186✔
214
                while isopen(chnl)
476✔
215
                    # The mapping function expects an array of input args, as it processes
216
                    # elements in a batch.
217
                    batch_collection=Any[]
434✔
218
                    n = 0
434✔
219
                    for exec_data in chnl
434✔
220
                        push!(batch_collection, exec_data)
922✔
221
                        n += 1
922✔
222
                        (n == batch_size) && break
922✔
223
                    end
501✔
224
                    if n > 0
434✔
225
                        exec_func(batch_collection)
426✔
226
                    end
227
                end
434✔
228
            else
229
                for exec_data in chnl
2,144✔
230
                    exec_func(exec_data...)
6,243✔
231
                end
6,285✔
232
            end
233
        catch e
234
            close(chnl)
3✔
235
            retval = capture_exception(e, catch_backtrace())
3✔
236
        end
237
        retval
2,186✔
238
    end
239
    push!(worker_tasks, t)
2,186✔
240
end
241

242
# Special handling for some types.
243
function asyncmap(f, s::AbstractString; kwargs...)
4✔
244
    s2 = Vector{Char}(undef, length(s))
2✔
245
    asyncmap!(f, s2, s; kwargs...)
2✔
246
    return String(s2)
2✔
247
end
248

249
# map on a single BitArray returns a BitArray if the mapping function is boolean.
250
function asyncmap(f, b::BitArray; kwargs...)
8✔
251
    b2 = async_usemap(f, b; kwargs...)
4✔
252
    if eltype(b2) == Bool
4✔
253
        return BitArray(b2)
2✔
254
    end
255
    return b2
2✔
256
end
257

258
mutable struct AsyncCollector
259
    f
260
    results
261
    enumerator::Enumerate
262
    ntasks
263
    batch_size
264
    nt_check::Bool     # check number of tasks on every iteration
265

266
    AsyncCollector(f, r, en::Enumerate, ntasks, batch_size) = new(f, r, en, ntasks, batch_size, isa(ntasks, Function))
5✔
267
end
268

269
"""
270
    AsyncCollector(f, results, c...; ntasks=0, batch_size=nothing) -> iterator
271

272
Return an iterator which applies `f` to each element of `c` asynchronously
273
and collects output into `results`.
274

275
Keyword args `ntasks` and `batch_size` have the same behavior as in
276
[`asyncmap`](@ref). If `batch_size` is specified, `f` must
277
be a function which operates on an array of argument tuples.
278

279
!!! note
280
    `iterate(::AsyncCollector, state) -> (nothing, state)`. A successful return
281
    from `iterate` indicates that the next element from the input collection is
282
    being processed asynchronously. It blocks until a free worker task becomes
283
    available.
284

285
!!! note
286
    `for _ in AsyncCollector(f, results, c...; ntasks=1) end` is equivalent to
287
    `map!(f, results, c...)`.
288
"""
289
function AsyncCollector(f, results, c...; ntasks=0, batch_size=nothing)
5✔
290
    AsyncCollector(f, results, enumerate(zip(c...)), ntasks, batch_size)
5✔
291
end
292

293
mutable struct AsyncCollectorState
294
    chnl::Channel
295
    worker_tasks::Array{Task,1}
296
    enum_state      # enumerator state
297
    AsyncCollectorState(chnl::Channel, worker_tasks::Vector) =
4✔
298
        new(chnl, convert(Vector{Task}, worker_tasks))
299
end
300

301
function iterate(itr::AsyncCollector)
2✔
302
    itr.ntasks = verify_ntasks(itr.enumerator, itr.ntasks)
2✔
303
    itr.batch_size = verify_batch_size(itr.batch_size)
2✔
304

305
    chnl, worker_tasks = setup_chnl_and_tasks((i,args) -> (itr.results[i]=itr.f(args...)), itr.ntasks, itr.batch_size)
56✔
306
    return iterate(itr, AsyncCollectorState(chnl, worker_tasks))
2✔
307
end
308

309
function wait_done(itr::AsyncCollector, state::AsyncCollectorState)
2✔
310
    close(state.chnl)
2✔
311

312
    # wait for all tasks to finish
313
    foreach(x->(v=fetch(x); isa(v, Exception) && throw(v)), state.worker_tasks)
16✔
314
    empty!(state.worker_tasks)
2✔
315
end
316

317
function iterate(itr::AsyncCollector, state::AsyncCollectorState)
24✔
318
    if itr.nt_check && (length(state.worker_tasks) < itr.ntasks())
24✔
319
        start_worker_task!(state.worker_tasks, itr.f, state.chnl)
×
320
    end
321

322
    # Get index and mapped function arguments from enumeration iterator.
323
    y = isdefined(state, :enum_state) ?
26✔
324
        iterate(itr.enumerator, state.enum_state) :
325
        iterate(itr.enumerator)
326
    if y === nothing
24✔
327
        wait_done(itr, state)
2✔
328
        return nothing
2✔
329
    end
330
    (i, args), state.enum_state = y
22✔
331
    put!(state.chnl, (i, args))
22✔
332

333
    return (nothing, state)
22✔
334
end
335

336
"""
337
    AsyncGenerator(f, c...; ntasks=0, batch_size=nothing) -> iterator
338

339
Apply `f` to each element of `c` using at most `ntasks` asynchronous tasks.
340

341
Keyword args `ntasks` and `batch_size` have the same behavior as in
342
[`asyncmap`](@ref). If `batch_size` is specified, `f` must
343
be a function which operates on an array of argument tuples.
344

345
!!! note
346
    `collect(AsyncGenerator(f, c...; ntasks=1))` is equivalent to
347
    `map(f, c...)`.
348
"""
349
mutable struct AsyncGenerator
350
    collector::AsyncCollector
1✔
351
end
352

353
function AsyncGenerator(f, c...; ntasks=0)
1✔
354
    AsyncGenerator(AsyncCollector(f, Dict{Int,Any}(), c...; ntasks=ntasks))
1✔
355
end
356

357
mutable struct AsyncGeneratorState
358
    i::Int
359
    collector_done::Bool
360
    collector_state::AsyncCollectorState
361
    AsyncGeneratorState(i::Int) = new(i, false)
1✔
362
end
363

364
function iterate(itr::AsyncGenerator, state::AsyncGeneratorState=AsyncGeneratorState(0))
11✔
365
    state.i += 1
12✔
366

367
    results_dict = itr.collector.results
11✔
368
    while !state.collector_done && !haskey(results_dict, state.i)
21✔
369
        y = isdefined(state, :collector_state) ?
12✔
370
            iterate(itr.collector, state.collector_state) :
371
            iterate(itr.collector)
372
        if y === nothing
11✔
373
            # `check_done` waits for async tasks to finish. if we do not have the index
374
            # we are looking for, it is an error.
375
            state.collector_done = true
1✔
376
            break;
1✔
377
        end
378
        _, state.collector_state = y
10✔
379
    end
10✔
380
    state.collector_done && isempty(results_dict) && return nothing
11✔
381
    r = results_dict[state.i]
10✔
382
    delete!(results_dict, state.i)
10✔
383

384
    return (r, state)
10✔
385
end
386

387
# pass-through iterator traits to the iterable
388
# on which the mapping function is being applied
389
IteratorSize(::Type{AsyncGenerator}) = SizeUnknown()
1✔
390
IteratorEltype(::Type{AsyncGenerator}) = EltypeUnknown()
2✔
391
size(itr::AsyncGenerator) = size(itr.collector.enumerator)
×
392
length(itr::AsyncGenerator) = length(itr.collector.enumerator)
×
393

394
"""
395
    asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
396

397
Like [`asyncmap`](@ref), but stores output in `results` rather than
398
returning a collection.
399

400
$(_DOCS_ALIASING_WARNING)
401
"""
402
function asyncmap!(f, r, c1, c...; ntasks=0, batch_size=nothing)
4✔
403
    foreach(identity, AsyncCollector(f, r, c1, c...; ntasks=ntasks, batch_size=batch_size))
4✔
404
    r
4✔
405
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc