An implementation of distributed memory 🧠 parallel computing is provided by module Distributed as part of the standard library 📚 shipped with Julia.
using Distributed
Julia provides a multiprocessing environment based on message passing to allow programs to run on multiple processes in separate memory 🧠 domains at once.
Communication in Julia is generally "one-sided", meaning that the programmer needs to explicitly manage only one process in a two-process operation. Furthermore, these operations typically do not look like "message 📨 send" and "message 📨 receive" but rather resemble higher-level operations like calls to user functions.
Functions addprocs, rmprocs, workers 👷, and others are available as a programmatic means of adding, removing and querying the processes.
Module Distributed must be explicitly loaded on the master process before invoking addprocs. It is automatically made available on the worker 👷 processes.
addprocs(5) #Add 5 new worker processes
5-element Array{Int64,1}: 2 3 4 5 6
workers() #Show all worker processes
5-element Array{Int64,1}: 2 3 4 5 6
rmprocs(6,5) #Remove the worker 6 and 5
workers()
3-element Array{Int64,1}: 2 3 4
🧐 Distributed programming in Julia is built on two primitives: remote references 📚 and remote calls 📞.
A remote reference is an object that can be used from any process to refer to an object stored on a particular process.
Remote references come in two flavors: Future 🔮 and RemoteChannel 🕳️.
A remote call returns a Future 🔮 to its result. Remote calls return immediately; the process that made the call proceeds to its next operation while the remote call happens somewhere else.
You can wait for a remote call to finish by calling wait on the returned Future 🔮, and you can obtain the full value of the result using fetch.
RemoteChannels 🕳️ are rewritable. For example, multiple processes can co-ordinate their processing by referencing the same remote Channel.
A remote call is a request by one process to call a certain function on certain arguments on another (possibly the same) process.
The first argument to remotecall 📞 is the function to call. Most parallel programming in Julia does not reference specific processes or the number of processes available, but remotecall 📞 is considered a low-level interface providing finer control 🎮. The second argument to remotecall 📞 is the id of the process that will do the work, and the remaining arguments will be passed to the function being called.
r = remotecall(rand, 2, 3, 4)
Future(2, 1, 7, nothing)
s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 8, nothing)
fetch(s)
3×4 Array{Float64,2}: 1.4976 1.26589 1.61851 1.26621 1.9398 1.7587 1.63158 1.87834 1.71603 1.21418 1.6479 1.06598
As you can see, in the first line we asked process 2 to construct a 3-by-4 random 🔀 matrix, and in the second line we asked it to add 1 to it. The result of both calculations 🧮 is available in the two futures, r and s. The @spawnat macro evaluates the expression in the second argument on the process specified by the first argument.
The channels example from above can be modified for interprocess communication.
Jobs, identified by an id (job_id), are written to the channel. Each remotely executing task in this simulation reads a job_id, waits for a random 🔀 amount of time ⏱️ and writes back a tuple of job_id, time taken and its own pid to the results channel. Finally all the results are printed out on the master process.
const jobs = RemoteChannel(()->Channel{Int}(32));
const results = RemoteChannel(()->Channel{Tuple}(32));
@everywhere function do_work(jobs, results) # define work function everywhere
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
put!(results, (job_id, exec_time, myid()))
end
end
function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
n = 12;
@async make_jobs(n); # feed the jobs channel with "n" jobs
for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
end
@elapsed while n > 0 # print out results
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
3 finished in 0.06 seconds on worker 4 1 finished in 0.13 seconds on worker 2 4 finished in 0.03 seconds on worker 4 5 finished in 0.11 seconds on worker 2 6 finished in 0.26 seconds on worker 4 2 finished in 0.5 seconds on worker 3 8 finished in 0.5 seconds on worker 4 10 finished in 0.02 seconds on worker 4 7 finished in 0.87 seconds on worker 2 9 finished in 0.77 seconds on worker 3 11 finished in 0.72 seconds on worker 4 12 finished in 0.76 seconds on worker 2
2.354414809
We can use @spawnat to flip coins on two processes. First, write the following function in count_heads:
@everywhere function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
function sum()
a = @spawnat 1 count_heads(100000000)
b = @spawnat 2 count_heads(100000000)
fetch(a)+fetch(b)
end
@time sum()
0.844747 seconds (83.96 k allocations: 4.359 MiB)
99995071
@time a = count_heads(200000000)
1.556201 seconds (1 allocation: 16 bytes)
100005826
We used two explicit @spawnat statements, which limits the parallelism to two processes. To run on any number of processes, we can use a parallel for loop 🔁, running in distributed memory, which can be written in Julia using @distributed like this:
@time nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
1.326883 seconds (58.94 k allocations: 2.927 MiB)
99998369
This construct implements the pattern of assigning iterations to multiple processes, and combining them with a specified reduction (in this case (+)). The result of each iteration is taken as the value of the last expression inside the loop 🔁. The whole parallel loop 🔁 expression itself evaluates to the final answer.
Note that although parallel for loops look like serial for loops 🔁, their behavior is dramatically different. In particular, the iterations do not happen in a specified order, and writes to variables or arrays will not be globally visible since iterations run on different processes.
Any variables used inside the parallel loop 🔁 will be copied and broadcast to each process.
For example, the following code will not work as intended:
a = zeros(10)
@distributed for i = 1:10
a[i] = i
end
a
10-element Array{Float64,1}: 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0
This code will not initialize all of a, since each process will have a separate copy of it. Parallel for loops 🔁 like these must be avoided.
Fortunately, Shared Arrays can be used to get around this limitation:
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
a
10-element SharedArray{Float64,1}: 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
SharedArray will be explained below
Using "outside" variables in parallel loops 🔁 is perfectly reasonable if the variables are read-only 👓:
b = randn(10)
@distributed for i = 1:10
println(b[i])
end
Task (runnable) @0x00007f10bfe27340
Shared Arrays use system shared memory 🧠 to map the same array across many processes. In a SharedArray 🖇️ each "participating" process has access to the entire array. A SharedArray 🖇️ is a good choice when you want to have a large amount of data jointly accessible to two or more processes on the same machine.
The constructor for a shared array is of the form:
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
Which creates an N-dimensional shared array of a bits type T and size dims across the processes specified by pids. A shared array is accessible only from those participating workers specified by the pids named argument (and the creating process too, if it is on the same host).
If an init function, of signature initfn(S::SharedArray), is specified, it is called on all the participating workers 👷. You can specify that each worker runs the init function on a distinct portion of the array, thereby parallelizing initialization.
@everywhere using SharedArrays
S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
From worker 2: 0.028566486513405803 From worker 2: 0.08741719073442625 From worker 2: -0.9475990413661702 From worker 2: -0.4165271807588755 From worker 4: 0.7835690894410531 From worker 4: -0.14577964749827468 From worker 4: 0.0858166350405183 From worker 3: 0.25742333881797125 From worker 3: -1.5288476236166226 From worker 3: 0.04238308914446343
3×4 SharedArray{Int64,2}: 2 2 3 4 2 3 3 4 2 3 4 4
SharedArray 🖇️ indexing (assignment and accessing values) works just as with regular arrays, and is efficient because the underlying memory 🧠 is available to the local process 👷.
S[3,2] = 7
S
3×4 SharedArray{Int64,2}: 2 2 3 4 2 3 3 4 2 7 4 4
Since all processes have access to the underlying data, you do have to be careful not to set up conflicts. For example:
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p) #Perform wait(remotecall(...))
end
end
end
S
3×4 SharedArray{Int64,2}: 3 3 2 3 3 2 2 3 3 2 3 3
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p) #Perform wait(remotecall(...))
end
end
end
S
3×4 SharedArray{Int64,2}: 3 3 2 2 3 2 2 2 3 2 2 2
The parallel loops 🔁 example from above can be better understood now
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
a
10-element SharedArray{Float64,1}: 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0