Parallel Programming using Julia 🤖

Coroutines (Tasks) 🥇

Julia's parallel programming platform uses Tasks to switch among multiple computations.

Tasks ๐Ÿ’ก are a control flow feature that allows computations to be suspended and resumed in a flexible manner. This feature is sometimes called by other names, such as symmetric coroutines, lightweight threads, cooperative multitasking, or one-shot continuations.

For our First Coroutine we are going to use a function to calculate the first ๐Ÿ‡ณ Values of Fibonacci Series ๐Ÿ‡, using an iterative approach (can be made using recursion too).

We will introduce a special notation to better understanding of what julia code is doing

Emoji Standard Process Notation (ESPN):

๐Ÿ…ฐ๏ธ = 0๏ธ

๐Ÿ…ฑ๏ธ = 1๏ธ

๐Ÿ” 0 ... ๐Ÿ‡ณ-3:

๐Ÿ…ฐ๏ธ, ๐Ÿ…ฑ๏ธ = ๐Ÿ…ฑ๏ธ,๐Ÿ…ฐ๏ธโž•๐Ÿ…ฑ๏ธ
In [1]:
function fib_w_parameters(n)
    # N *(๐Ÿ‡)
    println("Empezando el Calculo de Fibonacci")
    a = 0
    b = 1
    println("Numero ", 1,  " de la serie de fibonacci: ",a)
    println("Numero ", 2,  " de la serie de fibonacci: ",b)
    for t=0:n-3
        a, b = b, a+b
        println("Numero ", t+3,  " de la serie de fibonacci: ",b)
    end
    println("Llegamos al ultimo calculo de Fibonacci")

end
Out[1]:
fib_w_parameters (generic function with 1 method)

@task Macro

@task

This macro is commonly used to define a task using a function, in our example fib_w_parameters, and schedule() function execute the task passed by argument

We can execute a Task Using the next lines of code:

In [2]:
a = @task fib_w_parameters(7)
schedule(a)
Empezando el Calculo de Fibonacci
Out[2]:
Task (runnable) @0x00007feea9825120
Numero 1 de la serie de fibonacci: 0
Numero 

or, equivalently we can use Task (Our Function as an anonymous Function)

In [3]:
a = Task(() -> fib_w_parameters(7)); schedule(a)
2 de la serie de fibonacci: 1
Numero 3
Out[3]:
Task (runnable) @0x00007feea8fe6710
 de la serie de fibonacci: 1
Numero 4 de la serie de fibonacci: 

For refreshing our memory Fibonacci Series is defined recursively like this:

$$x_1=1, \quad x_2=1,\quad x_n=x_{n-1}+x_{n-2} \;\;(n>2)$$

😎

Activity 1:

Reader try to code the same coroutine in julia Programming Language but using Recursive formula (Don't Forget that Julia has Functional Paradigm too)

Channels 🕳️

To express an order of execution between lightweight threads communication primitives are necessary.

Julia offers Channel that creates a new task from func, binds it to a new channel of type ctype and size csize and schedule the task. Channels can serve as a way to communicate between tasks, as Channel creates a buffered channel of type T and size sz . Whenever code performs a communication operation like fetch or wait, the current task is suspended and a scheduler picks another task to run. A task is restarted when the event it is waiting for completes.

For many problems, it is not necessary to think about tasks directly. However, they can be used to wait for multiple events at the same time, which provides for dynamic scheduling.

In dynamic scheduling โฒ๏ธ, a program decides what to compute or where to compute it based on when other jobs finish. This is needed for unpredictable or unbalanced workloads, where we want to assign more work to processes only when they finish their current tasks.

Relevant Functions:

put!()
take!()

This functions put and take ๐Ÿšฎ values from a channel

In [4]:
function fib(c::Channel)
    # ๐Ÿ‡ -> ๐Ÿ•ณ๏ธ
    put!(c, "Soy una suculenta serie de Fibonacci entrando en el canal de Fibonacci")
    a = 0
    b = 1
    for n=1:6
        a, b = b, a+b
        put!(c, b)
    end
    put!(c, "Soy una suculenta serie de Fibonacci saliendo del canal de Fibonacci")
    # ๐Ÿ•ณ๏ธ -> ๐Ÿ‡
end
2
Numero 5 de la serie de fibonacci: 3
Out[4]:
fib (generic function with 1 method)
Numero 6 de la serie de fibonacci: 5

Emoji Standard Process Notation (ESPN):

๐Ÿ…ฐ๏ธ = 0๏ธ

๐Ÿ…ฑ๏ธ = 1๏ธ

๐Ÿ” 1๏ธ ... 6๏ธ:

๐Ÿ…ฐ๏ธ, ๐Ÿ…ฑ๏ธ = ๐Ÿ…ฑ๏ธ,๐Ÿ…ฐ๏ธโž•๐Ÿ…ฑ๏ธ
๐Ÿ…ฑ๏ธ -> ๐Ÿ‡๐Ÿ•ณ๏ธ

To enable Fibonacci channel ๐Ÿ‡๐Ÿ•ณ๏ธ we use the next lines of code:

In [5]:
chnl = Channel(fib);
println(take!(chnl))
println(take!(chnl))
take!(chnl)
Numero 7 de la serie de fibonacci: 8
Llegamos al ultimo calculo de Fibonacci
Soy una suculenta serie de Fibonacci entrando en el canal de Fibonacci
Empezando el Calculo de Fibonacci
Numero 1 de la serie de fibonacci: 0
Numero 2 de la serie de fibonacci: 1
Numero 3 de la serie de fibonacci: 1
Numero 4 de la serie de fibonacci: 2
Numero 5 de la serie de fibonacci: 3
Numero 6 de la serie de fibonacci: 5
Numero 7 de la serie de fibonacci: 8
Llegamos al ultimo calculo de Fibonacci
1
Out[5]:
2

Also we can iterate through a channel several times using a forin loop ๐Ÿ”

In [6]:
for i in Channel(fib)
    println(i)
end
Soy una suculenta serie de Fibonacci entrando en el canal de Fibonacci
1
2
3
5
8
13
Soy una suculenta serie de Fibonacci saliendo del canal de Fibonacci

Multiple Channels 🕳️...🕳️

@async

@async Macro takes care of creating a function, wrapping it in a Task and the scheduling that task. It will return the task object, but we don't need to store it for anything.

For our first Example we create 2 channels, the first one called Jobs ๐Ÿ›๏ธ and the second one called results ๐Ÿ“‰

In [7]:
const jobs = Channel{Int}(32); # ๐Ÿ›๏ธ๐Ÿ•ณ๏ธ
const results = Channel{Tuple}(32); # ๐Ÿ“‰๐Ÿ•ณ๏ธ

do_work function is going to iterate ๐Ÿ” through the number of jobs to generate a random number ๐Ÿ”€ that will be considered as time of execution โฑ๏ธ, then we will use sleep ๐Ÿ’ค function with the last random time (Simulating a process), to finish we put the execution time into the Results channel.

Emoji Standard Process Notation (ESPN):

๐Ÿ” ๐Ÿ›๏ธ in ๐Ÿ›๏ธ...๐Ÿ›๏ธ:

๐Ÿ”€โฑ๏ธ -> ๐Ÿ’ค
๐Ÿ”€โฑ๏ธ -> ๐Ÿ“‰๐Ÿ•ณ๏ธ
In [8]:
function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)                # simulates elapsed time doing actual work
                                               # typically performed externally.
               put!(results, (job_id, exec_time))
           end
       end;

make_jobs function iterates ๐Ÿ” through a ๐Ÿ‡ณ value and put the iterator value โ„น๏ธ into the channel jobs ๐Ÿ›๏ธ๐Ÿ•ณ๏ธ

Emoji Standard Process Notation (ESPN):

๐Ÿ” โ„น๏ธ in 1...๐Ÿ‡ณ:

โ„น๏ธ -> ๐Ÿ›๏ธ๐Ÿ•ณ๏ธ
In [9]:
function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

Defining Number of coroutines ๐Ÿ‡ณ

In [10]:
n = 12; # Number of Coroutines

Executing make_jobs with ๐Ÿ‡ณ as parameter

Emoji Standard Process Notation (ESPN):

๐Ÿ’กmake_jobs(๐Ÿ‡ณ)

In [11]:
@async make_jobs(n); # feed the jobs channel with "n" jobs

Now we want to iterate ๐Ÿ” doing 4 asynchronous tasks๐Ÿ’ก

Emoji Standard Process Notation (ESPN):

๐Ÿ” โ„น๏ธ in 1...4:

๐Ÿ’กdo_work
In [12]:
for i in 1:4 # start 4 tasks to process requests in parallel
   @async do_work()
end

For the Final execution we are going to use

@elapsed

This macro will help us to count time elapsed in the next loop ๐Ÿ”, in every iteration ๐Ÿ‡ณ value is going to decremented, and from the results channel ๐Ÿ“‰๐Ÿ•ณ๏ธ we are going to take a value ๐Ÿšฎ to print it

Emoji Standard Process Notation (ESPN):

๐Ÿ” ๐Ÿ‡ณ > 0:

๐Ÿ‡ณโฌ‡๏ธ

โ„น๏ธ, ๐Ÿ”€โฑ๏ธ <- ๐Ÿ“‰๐Ÿ•ณ๏ธ

print(โ„น๏ธ, ๐Ÿ”€โฑ๏ธ)
In [13]:
@elapsed while n > 0 # print out results
   global n = n - 1
   job_id, exec_time = take!(results)
   println("$job_id finished in $(round(exec_time; digits=2)) seconds")
end
3 finished in 0.02 seconds
2 finished in 0.05 seconds
6 finished in 0.33 seconds
1 finished in 0.39 seconds
4 finished in 0.4 seconds
9 finished in 0.26 seconds
5 finished in 0.96 seconds
10 finished in 0.45 seconds
7 finished in 0.84 seconds
8 finished in 0.99 seconds
11 finished in 0.93 seconds
12 finished in 0.96 seconds
Out[13]:
2.038185842

Multi-Threading (Experimental) 🥈

In addition to tasks Julia natively supports multi-threading ๐Ÿงต...๐Ÿงต

Setup

By default, Julia starts up with a single thread ๐Ÿงต of execution. This can be verified by using the command:

In [14]:
Threads.nthreads()
Out[14]:
4

We can set 4 threads ๐Ÿงต using:

export JULIA_NUM_THREADS=4

In [15]:
Threads.nthreads()
Out[15]:
4

We set a variable called a ๐Ÿ…ฐ๏ธ as a array ๐Ÿฑ of ten zeros

Emoji Standard Process Notation (ESPN):

๐Ÿ…ฐ๏ธ = ๐Ÿฑ(10, 0)

In [16]:
a = zeros(10)
Out[16]:
10-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

โ›“๏ธ Parallel loops ๐Ÿ” using Threads ๐Ÿงต

We are going to use:

Threads.threadid()

to get the actual thread ๐Ÿงต id โ„น๏ธ

Emoji Standard Process Notation (ESPN):

โ›“๏ธ๐Ÿ” 1...10

๐Ÿ…ฐ๏ธ๐Ÿฑ[โ„น๏ธ] = Threads.threadid()

print(๐Ÿ…ฐ๏ธ๐Ÿฑ)

In [17]:
Threads.@threads for i = 1:10
           a[i] = Threads.threadid()
       end

println(a)
[1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0]

Atomic Operations ⚛️

Julia supports accessing and modifying values atomically, that is, in a thread-safe ๐Ÿงต way to avoid race conditions ๐Ÿ‡. A value (which must be of a primitive type) can be wrapped as Threads.Atomic ๐Ÿงตโš›๏ธ to indicate it must be accessed in this way. Here we can see an example:

Threads.atomic_add! Atomically add id to i

Emoji Standard Process Notation (ESPN):

โ„น๏ธ = ๐Ÿงตโš›๏ธ(0)

ids = ๐Ÿฑ(4, 0)

old_is = ๐Ÿฑ(4, 0)

โ›“๏ธ๐Ÿ” id in 1...4

old_is๐Ÿฑ[id] = ๐Ÿงตโš›๏ธโž•(i, id)

ids๐Ÿฑ[id] = id
In [18]:
i = Threads.Atomic{Int}(0)
ids = zeros(4)
old_is = zeros(4)

Threads.@threads for id in 1:4
           old_is[id] = Threads.atomic_add!(i, id)
           ids[id] = id
       end
In [19]:
old_is
Out[19]:
4-element Array{Float64,1}:
 0.0
 1.0
 3.0
 6.0
In [20]:
ids
Out[20]:
4-element Array{Float64,1}:
 1.0
 2.0
 3.0
 4.0

Had we tried to do the addition without the atomic tag ๐Ÿงตโš›๏ธ, we might have gotten the wrong answer due to a race condition ๐Ÿ‡.

In the next example we will iterate โ›“๏ธ๐Ÿ” 1000 times and adding one into a variable called acc.

An example of what would happen if we didn't avoid the race

Emoji Standard Process Notation (ESPN):

acc = 0

โ›“๏ธ๐Ÿ” โ„น๏ธ in 1...1000

acc += 1

print(acc)

In [21]:
using Base.Threads


acc = Ref(0)

# Without atomic Tag
@threads for i in 1:1000
          acc[] += 1
       end

acc[]
Out[21]:
264

Changing the tag into atomic tag ๐Ÿงตโš›๏ธ we can avoid race conditions ๐Ÿ‡, to show the difference with the last value, we changed the acc name to acc1.

Emoji Standard Process Notation (ESPN):

acc1 = ๐Ÿงตโš›๏ธ(0)

โ›“๏ธ๐Ÿ” โ„น๏ธ in 1...1000

๐Ÿงตโš›๏ธโž•(acc1, 1)

print(acc)

In [22]:
# With Atomic Tag

acc1 = Atomic{Int}(0)
@threads for i in 1:1000
         atomic_add!(acc1, 1)
       end

acc1[]
Out[22]:
1000

Side effects 🤕🤢 and mutable function arguments

When using multi-threading๐Ÿงต...๐Ÿงต we have to be careful ๐Ÿง when using functions that are not pure as we might get a wrong answer. For instance functions that have their name ending with ! by convention modify their arguments and thus are not pure. However, there are functions that have side effects and their name does not end with !.

The next function f() has race conditions ๐Ÿ‡ because as we said before is using pure functions and not instace functions.

As ESPN is not a language that supports Regular Expressions we can't traduce it ๐Ÿ˜ญ

In [23]:
### This Example doesn't work in the last version of Julia, test it on Julia 1.0.4###
println(Threads.nthreads())
function f()
           s = repeat(["123", "213", "231"], outer=1000)
           x = similar(s, Int)
           rx = r"1"
           Threads.@threads for i in 1:3000
               x[i] = findfirst(rx, s[i]).start
           end
           count(v -> v == 1, x)
       end
f() # the correct result is 1000
4
Out[23]:
1000

We modified rx which was using a common regular expression between 4 threads, that was the cause of the race condition, now rx is a regex for every thread available

In [24]:
function f_fix()
             s = repeat(["123", "213", "231"], outer=1000)
             x = similar(s, Int)
             rx = [Regex("1") for i in 1:Threads.nthreads()]
             Threads.@threads for i in 1:3000
                 x[i] = findfirst(rx[Threads.threadid()], s[i]).start
             end
             count(v -> v == 1, x)
         end
f_fix()
Out[24]:
1000