Julia's parallel programming platform uses Tasks to switch among multiple computations.
Tasks ๐ก are a control flow feature that allows computations to be suspended and resumed in a flexible manner. This feature is sometimes called by other names, such as symmetric coroutines, lightweight threads, cooperative multitasking, or one-shot continuations.
For our First Coroutine we are going to use a function to calculate the first ๐ณ Values of Fibonacci Series ๐, using an iterative approach (can be made using recursion too).
We will introduce a special notation to better understanding of what julia code is doing
Emoji Standard Process Notation (ESPN):
๐ ฐ๏ธ = 0๏ธ
๐ ฑ๏ธ = 1๏ธ
๐ 0 ... ๐ณ-3:
๐
ฐ๏ธ, ๐
ฑ๏ธ = ๐
ฑ๏ธ,๐
ฐ๏ธโ๐
ฑ๏ธ
function fib_w_parameters(n)
# N *(๐)
println("Empezando el Calculo de Fibonacci")
a = 0
b = 1
println("Numero ", 1, " de la serie de fibonacci: ",a)
println("Numero ", 2, " de la serie de fibonacci: ",b)
for t=0:n-3
a, b = b, a+b
println("Numero ", t+3, " de la serie de fibonacci: ",b)
end
println("Llegamos al ultimo calculo de Fibonacci")
end
@task
This macro is commonly used to define a task using a function, in our example fib_w_parameters, and schedule() function execute the task passed by argument
We can execute a Task Using the next lines of code:
a = @task fib_w_parameters(7)
schedule(a)
or, equivalently we can use Task (Our Function as an anonymous Function)
a = Task(() -> fib_w_parameters(7)); schedule(a)
For refreshing our memory Fibonacci Series is defined recursively like this:
$$x_1=1, \quad x_2=1,\quad x_n=x_{n-1}+x_{n-2} \;\;(n>2)$$Activity 1:
Reader try to code the same coroutine in julia Programming Language but using Recursive formula (Don't Forget that Julia has Functional Paradigm too)
To express an order of execution between lightweight threads communication primitives are necessary.
Julia offers Channel that creates a new task from func, binds it to a new channel of type ctype and size csize and schedule the task. Channels can serve as a way to communicate between tasks, as Channel creates a buffered channel of type T and size sz . Whenever code performs a communication operation like fetch or wait, the current task is suspended and a scheduler picks another task to run. A task is restarted when the event it is waiting for completes.
For many problems, it is not necessary to think about tasks directly. However, they can be used to wait for multiple events at the same time, which provides for dynamic scheduling.
In dynamic scheduling โฒ๏ธ, a program decides what to compute or where to compute it based on when other jobs finish. This is needed for unpredictable or unbalanced workloads, where we want to assign more work to processes only when they finish their current tasks.
Relevant Functions:
put!()
take!()
This functions put and take ๐ฎ values from a channel
function fib(c::Channel)
# ๐ -> ๐ณ๏ธ
put!(c, "Soy una suculenta serie de Fibonacci entrando en el canal de Fibonacci")
a = 0
b = 1
for n=1:6
a, b = b, a+b
put!(c, b)
end
put!(c, "Soy una suculenta serie de Fibonacci saliendo del canal de Fibonacci")
# ๐ณ๏ธ -> ๐
end
Emoji Standard Process Notation (ESPN):
๐ ฐ๏ธ = 0๏ธ
๐ ฑ๏ธ = 1๏ธ
๐ 1๏ธ ... 6๏ธ:
๐
ฐ๏ธ, ๐
ฑ๏ธ = ๐
ฑ๏ธ,๐
ฐ๏ธโ๐
ฑ๏ธ
๐
ฑ๏ธ -> ๐๐ณ๏ธ
To enable Fibonacci channel ๐๐ณ๏ธ we use the next lines of code:
chnl = Channel(fib);
println(take!(chnl))
println(take!(chnl))
take!(chnl)
Also we can iterate through a channel several times using a forin loop ๐
for i in Channel(fib)
println(i)
end
@async
@async Macro takes care of creating a function, wrapping it in a Task and the scheduling that task. It will return the task object, but we don't need to store it for anything.
For our first Example we create 2 channels, the first one called Jobs ๐๏ธ and the second one called results ๐
const jobs = Channel{Int}(32); # ๐๏ธ๐ณ๏ธ
const results = Channel{Tuple}(32); # ๐๐ณ๏ธ
do_work function is going to iterate ๐ through the number of jobs to generate a random number ๐ that will be considered as time of execution โฑ๏ธ, then we will use sleep ๐ค function with the last random time (Simulating a process), to finish we put the execution time into the Results channel.
Emoji Standard Process Notation (ESPN):
๐ ๐๏ธ in ๐๏ธ...๐๏ธ:
๐โฑ๏ธ -> ๐ค
๐โฑ๏ธ -> ๐๐ณ๏ธ
function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
# typically performed externally.
put!(results, (job_id, exec_time))
end
end;
make_jobs function iterates ๐ through a ๐ณ value and put the iterator value โน๏ธ into the channel jobs ๐๏ธ๐ณ๏ธ
Emoji Standard Process Notation (ESPN):
๐ โน๏ธ in 1...๐ณ:
โน๏ธ -> ๐๏ธ๐ณ๏ธ
function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
Defining Number of coroutines ๐ณ
n = 12; # Number of Coroutines
Executing make_jobs with ๐ณ as parameter
Emoji Standard Process Notation (ESPN):
๐กmake_jobs(๐ณ)
@async make_jobs(n); # feed the jobs channel with "n" jobs
Now we want to iterate ๐ doing 4 asynchronous tasks๐ก
Emoji Standard Process Notation (ESPN):
๐ โน๏ธ in 1...4:
๐กdo_work
for i in 1:4 # start 4 tasks to process requests in parallel
@async do_work()
end
For the Final execution we are going to use
@elapsed
This macro will help us to count time elapsed in the next loop ๐, in every iteration ๐ณ value is going to decremented, and from the results channel ๐๐ณ๏ธ we are going to take a value ๐ฎ to print it
Emoji Standard Process Notation (ESPN):
๐ ๐ณ > 0:
๐ณโฌ๏ธ
โน๏ธ, ๐โฑ๏ธ <- ๐๐ณ๏ธ
print(โน๏ธ, ๐โฑ๏ธ)
@elapsed while n > 0 # print out results
global n = n - 1
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
end
Threads.nthreads()
We can set 4 threads ๐งต using:
export JULIA_NUM_THREADS=4
Threads.nthreads()
We set a variable called a ๐ ฐ๏ธ as a array ๐ฑ of ten zeros
Emoji Standard Process Notation (ESPN):
๐ ฐ๏ธ = ๐ฑ(10, 0)
a = zeros(10)
โ๏ธ Parallel loops ๐ using Threads ๐งต
We are going to use:
Threads.threadid()
to get the actual thread ๐งต id โน๏ธ
Emoji Standard Process Notation (ESPN):
โ๏ธ๐ 1...10
๐
ฐ๏ธ๐ฑ[โน๏ธ] = Threads.threadid()
print(๐ ฐ๏ธ๐ฑ)
Threads.@threads for i = 1:10
a[i] = Threads.threadid()
end
println(a)
Julia supports accessing and modifying values atomically, that is, in a thread-safe ๐งต way to avoid race conditions ๐. A value (which must be of a primitive type) can be wrapped as Threads.Atomic ๐งตโ๏ธ to indicate it must be accessed in this way. Here we can see an example:
Threads.atomic_add! Atomically add id to i
Emoji Standard Process Notation (ESPN):
โน๏ธ = ๐งตโ๏ธ(0)
ids = ๐ฑ(4, 0)
old_is = ๐ฑ(4, 0)
โ๏ธ๐ id in 1...4
old_is๐ฑ[id] = ๐งตโ๏ธโ(i, id)
ids๐ฑ[id] = id
i = Threads.Atomic{Int}(0)
ids = zeros(4)
old_is = zeros(4)
Threads.@threads for id in 1:4
old_is[id] = Threads.atomic_add!(i, id)
ids[id] = id
end
old_is
ids
Had we tried to do the addition without the atomic tag ๐งตโ๏ธ, we might have gotten the wrong answer due to a race condition ๐.
In the next example we will iterate โ๏ธ๐ 1000 times and adding one into a variable called acc.
An example of what would happen if we didn't avoid the race
Emoji Standard Process Notation (ESPN):
acc = 0
โ๏ธ๐ โน๏ธ in 1...1000
acc += 1
print(acc)
using Base.Threads
acc = Ref(0)
# Without atomic Tag
@threads for i in 1:1000
acc[] += 1
end
acc[]
Changing the tag into atomic tag ๐งตโ๏ธ we can avoid race conditions ๐, to show the difference with the last value, we changed the acc name to acc1.
Emoji Standard Process Notation (ESPN):
acc1 = ๐งตโ๏ธ(0)
โ๏ธ๐ โน๏ธ in 1...1000
๐งตโ๏ธโ(acc1, 1)
print(acc)
# With Atomic Tag
acc1 = Atomic{Int}(0)
@threads for i in 1:1000
atomic_add!(acc1, 1)
end
acc1[]
When using multi-threading๐งต...๐งต we have to be careful ๐ง when using functions that are not pure as we might get a wrong answer. For instance functions that have their name ending with ! by convention modify their arguments and thus are not pure. However, there are functions that have side effects and their name does not end with !.
The next function f() has race conditions ๐ because as we said before is using pure functions and not instace functions.
As ESPN is not a language that supports Regular Expressions we can't traduce it ๐ญ
### This Example doesn't work in the last version of Julia, test it on Julia 1.0.4###
println(Threads.nthreads())
function f()
s = repeat(["123", "213", "231"], outer=1000)
x = similar(s, Int)
rx = r"1"
Threads.@threads for i in 1:3000
x[i] = findfirst(rx, s[i]).start
end
count(v -> v == 1, x)
end
f() # the correct result is 1000
We modified rx which was using a common regular expression between 4 threads, that was the cause of the race condition, now rx is a regex for every thread available
function f_fix()
s = repeat(["123", "213", "231"], outer=1000)
x = similar(s, Int)
rx = [Regex("1") for i in 1:Threads.nthreads()]
Threads.@threads for i in 1:3000
x[i] = findfirst(rx[Threads.threadid()], s[i]).start
end
count(v -> v == 1, x)
end
f_fix()