Multi-Core or Distributed Processing 🥉

An implementation of distributed memory 🧠 parallel computing is provided by module Distributed as part of the standard library 📚 shipped with Julia.

In [1]:
using Distributed

Julia provides a multiprocessing environment based on message passing to allow programs to run on multiple processes in separate memory 🧠 domains at once.

Communication in Julia is generally "one-sided", meaning that the programmer needs to explicitly manage only one process in a two-process operation. Furthermore, these operations typically do not look like "message 📨 send" and "message 📨 receive" but rather resemble higher-level operations like calls to user functions.

Managing worker processes 👷

Functions addprocs, rmprocs, workers 👷, and others are available as a programmatic means of adding, removing and querying the processes.

Module Distributed must be explicitly loaded on the master process before invoking addprocs. It is automatically made available on the worker 👷 processes.

In [2]:
addprocs(5) #Add 5 new worker processes
Out[2]:
5-element Array{Int64,1}:
 2
 3
 4
 5
 6
In [3]:
workers() #Show all worker processes
Out[3]:
5-element Array{Int64,1}:
 2
 3
 4
 5
 6
In [4]:
rmprocs(6,5) #Remove the worker 6 and 5
workers()
Out[4]:
3-element Array{Int64,1}:
 2
 3
 4

🧐 Distributed programming in Julia is built on two primitives: remote references 📚 and remote calls 📞.

Remote references 📚

A remote reference is an object that can be used from any process to refer to an object stored on a particular process.

Remote references come in two flavors: Future 🔮 and RemoteChannel 🕳️.

A remote call returns a Future 🔮 to its result. Remote calls return immediately; the process that made the call proceeds to its next operation while the remote call happens somewhere else.

You can wait for a remote call to finish by calling wait on the returned Future 🔮, and you can obtain the full value of the result using fetch.

RemoteChannels 🕳️ are rewritable. For example, multiple processes can co-ordinate their processing by referencing the same remote Channel.

Remote calls 📞

A remote call is a request by one process to call a certain function on certain arguments on another (possibly the same) process.

The first argument to remotecall 📞 is the function to call. Most parallel programming in Julia does not reference specific processes or the number of processes available, but remotecall 📞 is considered a low-level interface providing finer control 🎮. The second argument to remotecall 📞 is the id of the process that will do the work, and the remaining arguments will be passed to the function being called.

In [5]:
r = remotecall(rand, 2, 3, 4)
Out[5]:
Future(2, 1, 7, nothing)
In [6]:
s = @spawnat 2 1 .+ fetch(r)
Out[6]:
Future(2, 1, 8, nothing)
In [7]:
fetch(s)
Out[7]:
3×4 Array{Float64,2}:
 1.4976   1.26589  1.61851  1.26621
 1.9398   1.7587   1.63158  1.87834
 1.71603  1.21418  1.6479   1.06598

As you can see, in the first line we asked process 2 to construct a 3-by-4 random 🔀 matrix, and in the second line we asked it to add 1 to it. The result of both calculations 🧮 is available in the two futures, r and s. The @spawnat macro evaluates the expression in the second argument on the process specified by the first argument.

Channels and RemoteChannels 🕳️

  • A Channel 🕳️ is local to a process. Worker 👷 2 cannot directly refer to a Channel on worker 👷 3 and vice-versa. A RemoteChannel 🕳️, however, can put and take values across workers.
  • A RemoteChannel 🕳️ can be thought of as a handle to a Channel 🕳️.
  • The process id, pid, associated with a RemoteChannel 🕳️ identifies the process where the backing store, i.e., the backing Channel 🕳️ exists.
  • Any process with a reference to a RemoteChannel 🕳️ can put and take items from the channel. Data is automatically sent to (or retrieved from) the process a RemoteChannel 🕳️ is associated with.
  • Serializing a Channel 🕳️ also serializes any data present in the channel. Deserializing it therefore effectively makes a copy of the original object.
  • On the other hand, serializing a RemoteChannel 🕳️ only involves the serialization of an identifier that identifies the location and instance of Channel referred to by the handle. A deserialized RemoteChannel 🕳️ object (on any worker 👷), therefore also points to the same backing store as the original.

The channels example from above can be modified for interprocess communication.

Jobs, identified by an id (job_id), are written to the channel. Each remotely executing task in this simulation reads a job_id, waits for a random 🔀 amount of time ⏱️ and writes back a tuple of job_id, time taken and its own pid to the results channel. Finally all the results are printed out on the master process.

In [8]:
const jobs = RemoteChannel(()->Channel{Int}(32));
const results = RemoteChannel(()->Channel{Tuple}(32));
In [9]:
@everywhere function do_work(jobs, results) # define work function everywhere
   while true
       job_id = take!(jobs)
       exec_time = rand()
       sleep(exec_time) # simulates elapsed time doing actual work
       put!(results, (job_id, exec_time, myid()))
   end
end

function make_jobs(n)
   for i in 1:n
       put!(jobs, i)
   end
end;
In [10]:
n = 12;
@async make_jobs(n); # feed the jobs channel with "n" jobs

for p in workers() # start tasks on the workers to process requests in parallel
   remote_do(do_work, p, jobs, results)
end
In [11]:
@elapsed while n > 0 # print out results
   job_id, exec_time, where = take!(results)
   println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
   global n = n - 1
end
3 finished in 0.06 seconds on worker 4
1 finished in 0.13 seconds on worker 2
4 finished in 0.03 seconds on worker 4
5 finished in 0.11 seconds on worker 2
6 finished in 0.26 seconds on worker 4
2 finished in 0.5 seconds on worker 3
8 finished in 0.5 seconds on worker 4
10 finished in 0.02 seconds on worker 4
7 finished in 0.87 seconds on worker 2
9 finished in 0.77 seconds on worker 3
11 finished in 0.72 seconds on worker 4
12 finished in 0.76 seconds on worker 2
Out[11]:
2.354414809

Parallel Map and Loops 🔁

We can use @spawnat to flip coins on two processes. First, write the following function in count_heads:

In [12]:
@everywhere function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += rand(Bool)
    end
    c
end
In [13]:
function sum()
    a = @spawnat 1 count_heads(100000000)
    b = @spawnat 2 count_heads(100000000)
    fetch(a)+fetch(b)
end
@time sum()
  0.844747 seconds (83.96 k allocations: 4.359 MiB)
Out[13]:
99995071
In [14]:
@time a = count_heads(200000000)
  1.556201 seconds (1 allocation: 16 bytes)
Out[14]:
100005826

We used two explicit @spawnat statements, which limits the parallelism to two processes. To run on any number of processes, we can use a parallel for loop 🔁, running in distributed memory, which can be written in Julia using @distributed like this:

In [15]:
@time nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end
  1.326883 seconds (58.94 k allocations: 2.927 MiB)
Out[15]:
99998369

This construct implements the pattern of assigning iterations to multiple processes, and combining them with a specified reduction (in this case (+)). The result of each iteration is taken as the value of the last expression inside the loop 🔁. The whole parallel loop 🔁 expression itself evaluates to the final answer.

Note that although parallel for loops look like serial for loops 🔁, their behavior is dramatically different. In particular, the iterations do not happen in a specified order, and writes to variables or arrays will not be globally visible since iterations run on different processes.

Any variables used inside the parallel loop 🔁 will be copied and broadcast to each process.

For example, the following code will not work as intended:

In [16]:
a = zeros(10)
@distributed for i = 1:10
    a[i] = i
end
a
Out[16]:
10-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

This code will not initialize all of a, since each process will have a separate copy of it. Parallel for loops 🔁 like these must be avoided.

Fortunately, Shared Arrays can be used to get around this limitation:

In [17]:
using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end
a
Out[17]:
10-element SharedArray{Float64,1}:
  1.0
  2.0
  3.0
  4.0
  5.0
  6.0
  7.0
  8.0
  9.0
 10.0

SharedArray will be explained below

Using "outside" variables in parallel loops 🔁 is perfectly reasonable if the variables are read-only 👓:

In [18]:
b = randn(10)
@distributed for i = 1:10
    println(b[i])
end
Out[18]:
Task (runnable) @0x00007f10bfe27340

Shared array 🖇️

Shared Arrays use system shared memory 🧠 to map the same array across many processes. In a SharedArray 🖇️ each "participating" process has access to the entire array. A SharedArray 🖇️ is a good choice when you want to have a large amount of data jointly accessible to two or more processes on the same machine.

The constructor for a shared array is of the form:

SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

Which creates an N-dimensional shared array of a bits type T and size dims across the processes specified by pids. A shared array is accessible only from those participating workers specified by the pids named argument (and the creating process too, if it is on the same host).

If an init function, of signature initfn(S::SharedArray), is specified, it is called on all the participating workers 👷. You can specify that each worker runs the init function on a distinct portion of the array, thereby parallelizing initialization.

In [19]:
@everywhere using SharedArrays
S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
      From worker 2:	0.028566486513405803
      From worker 2:	0.08741719073442625
      From worker 2:	-0.9475990413661702
      From worker 2:	-0.4165271807588755
      From worker 4:	0.7835690894410531
      From worker 4:	-0.14577964749827468
      From worker 4:	0.0858166350405183
      From worker 3:	0.25742333881797125
      From worker 3:	-1.5288476236166226
      From worker 3:	0.04238308914446343
Out[19]:
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

SharedArray 🖇️ indexing (assignment and accessing values) works just as with regular arrays, and is efficient because the underlying memory 🧠 is available to the local process 👷.

In [20]:
S[3,2] = 7
S
Out[20]:
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  7  4  4

Since all processes have access to the underlying data, you do have to be careful not to set up conflicts. For example:

In [21]:
@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)  #Perform wait(remotecall(...))
        end
    end
end
S
Out[21]:
3×4 SharedArray{Int64,2}:
 3  3  2  3
 3  2  2  3
 3  2  3  3
In [22]:
@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)  #Perform wait(remotecall(...))
        end
    end
end
S
Out[22]:
3×4 SharedArray{Int64,2}:
 3  3  2  2
 3  2  2  2
 3  2  2  2

The parallel loops 🔁 example from above can be better understood now

In [23]:
using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end
a
Out[23]:
10-element SharedArray{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0