Multiprocessing and distributed computing
Support for parallel computing in distributed memory systems is implemented in the module Distributed
, which is included in the Julia standard library.
Most modern computers have multiple CPUs, and the computers themselves can be combined into clusters. Using the capabilities of multiple CPUs allows you to speed up many calculations. Performance is affected by two main factors: the speed of the CPUs themselves and the speed of their memory access. It is quite obvious that in a cluster, a single CPU will be able to access the RAM of its computer (node) faster. More surprisingly, a similar situation can be observed on any multi-core computer due to differences in the performance of the main memory and https://www.akkadia.org/drepper/cpumemory.pdf [cache]. Therefore, in a well-designed multiprocessing environment, the binding of a particular memory block to a specific CPU must be controlled. Julia provides a message-based multiprocessing environment: programs can be executed simultaneously as several processes in separate memory areas.
Message passing is implemented in Julia differently than in other environments such as MPI.[In this context, MPI refers to the MPI-1 standard. Since MPI-2, the MPI Standards Development Committee has introduced a new set of interoperability mechanisms, collectively referred to as "Remote Memory Access" (RMA). The motivation for adding rma to the MPI standard was to simplify one-way communication patterns. For more information about the latest MPI standard, see https://mpi-forum.org/docs .]. Interaction in Julia is usually one-sided. This means that in the case of a two-process operation, it is enough for the programmer to explicitly control only one process. Moreover, interaction usually occurs not just in the form of sending and receiving messages, but more like high-level operations, such as calls to user functions.
Distributed programming in Julia is based on two primitives: remote references and remote calls. A remote reference is an object that can be used from any process to access an object stored in a specific other process. A remote call is a request made by one process to call a function with certain arguments in another (or the same) process.
Deleted links can be of two types: method Future
and macros RemoteChannel
.
A remote call returns a link Future
for the result of execution. A remote call returns control immediately: the process that made it proceeds to the next operation while the call itself is being executed somewhere else. To wait for the remote call to end, call the function wait
for the returned object Future
, and to get the full resulting value, use the method fetch
.
In turn, the objects `RemoteChannel' supports overwriting. For example, multiple processes can coordinate processing by referring to the same remote channel (`Channel').
Each process has an identifier associated with it. The process that provides the Julia interactive command prompt always has an id
of 1. The processes used by default for parallel operations are called workers. When there is only one process, process 1 is considered a worker. Otherwise, all processes except process 1 are considered workers. Therefore, to take advantage of parallel processing methods such as `pmap', two or more processes must be used. For example, adding one additional process can be useful if you want the main process to continue performing its operations while the worker is busy with lengthy calculations.
Let’s try to figure it out in practice. When using the julia -p n
command, n
workflows are started on the local computer. As a rule, it makes sense to make n
equal to the number of CPU threads (logical cores) on the computer. Note that when the -p
argument is specified, the module is loaded automatically. Distributed
.
$ julia -p 2
julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)
julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.18526 1.50912
1.16296 1.60607
The first argument of the method remotecall
— the function being called. When programming in parallel, Julia usually does not specify specific processes or the number of available processes, however 'remotecall` is considered a low-level interface that provides more detailed control. The second argument remotecall
is the id
of the process that will perform the work. The remaining arguments will be passed to the called function.
Thus, in the first row, process 2 was sent a request to create a 2-by-2 matrix, and in the second row, a request to add the value 1 to it. The result of both calculations is available in two Future objects: r
and s
. The macro @spawnat
evaluates the expression specified in the second argument in the process specified in the first argument.
Sometimes it may be necessary to immediately retrieve a value calculated remotely. This usually happens when you receive data from a remote object that is necessary for the next local operation. For this purpose, there is a function remotecall_fetch
. It is equivalent to the function fetch(remotecall(…))+
, but more efficient.
julia> remotecall_fetch(r-> fetch(r)[1, 1], 2, r)
0.18526337335308085
This code returns the first value from the array in workflow 2. Note that the fetch
method in this case does not move data, as it is executed in the workflow that owns the array. It could also be written as follows.
julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.10824216411304866
As you remember, the expression getindex(r,1,1)
is equivalent to r[1,1]
, so this call gets the first element of the Future object r
.
To simplify your task, you can pass it to a macro @spawnat
character :any
for automatic workflow selection.
julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)
julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.38854 1.9098
1.20939 1.57158
Note that we used 1 .+ fetch(r)
instead of 1 .+ r
. The reason is that we don’t know where the code will be executed, so in general, moving r
to the process where the addition is performed requires fetch
. In this case, the macro '@spawnat` knows that the calculation needs to be performed in the process that r
belongs to, so fetch
will be an idle operation (no work is being performed).
(It is worth noting that @spawnat
— this is not a built-in function, but macro defined in Julia. You can define your own similar constructions.)
It is important to remember that after receiving the object Future
caches its value locally. Further challenges fetch
does not result in a network jump. After receiving all the links Future
the value stored remotely is deleted.
The macro @async
is similar to a macro @spawnat
, but performs tasks only in the local process. It is used to create a provider task (feeder) for each process. Each task gets the next index to be calculated, waits for the process to complete, and then repeats execution until the indexes run out. Note that the supplier tasks do not start executing until the main task reaches the end of the block. @sync
. At this point, it gives up control and waits for all local tasks to be completed, after which the function returns control. Starting from version 0.7, vendor tasks can report their status via nextidx, since they are all executed in the same process. Even if tasks are planned jointly, in some situations, for example, when asynchronous I/O, blocking may be required. This means that context switches only occur at certain times, in this case when calling remotecall_fetch
. This is the current implementation status, which may change in future versions of Julia. Its goal is to ensure that up to N tasks are completed in the number of M processes (`Process'), that is, the model https://en.wikipedia.org/wiki/Thread_ (computing)#Models[M:N multithreading]. nextidx will require a lock-up and release model, as simultaneous reads from a resource and writes to a resource by multiple processes are dangerous.
Code availability and package downloads
Your code should be available to any process in which it is executed. For example, type the following in the Julia command prompt:
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2×2 Array{Float64,2}:
0.153756 0.368514
1.15119 0.918912
julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))))
Stacktrace:
[...]
Process 1 knows the function `rand2', but process 2 does not.
The code is usually loaded from files or packages, and you can flexibly control which processes the code is loaded into. Let’s take the DummyModule' file.jl
containing the following code.
module DummyModule
export MyType, f
mutable struct MyType
a::Int
end
f(x) = x^2+1
println("loaded")
end
So that all processes can reference the MyType
, the DummyModule' file.jl
must be loaded into each process. When calling include("DummyModule.jl")`it is loaded into only one process. To upload a file to each process, use the macro
@everywhere` (Julia should be launched with the command `julia -p 2').
julia> @everywhere include("DummyModule.jl")
loaded
From worker 3: loaded
From worker 2: loaded
As usual, the DummyModule
module is not included in the process scope. This requires the use of a keyword. using
or import
. Moreover, when the DummyModule
is injected into the scope in one process, it doesn’t happen in others.:
julia> using .DummyModule
julia> MyType(7)
MyType(7)
julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: `MyType` not defined in `Main`
⋮
julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)
However, you can still pass MyType
to a process in which the DummyModule
module is loaded but not included in the scope.
julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)
The file can also be preloaded into several processes at startup using the -L
flag, and an activation script can be used to activate its calculation.
julia -p <n> -L file1.jl -L file2.jl driver.jl
In the example above, the Julia process executing the activation script has an id
with the value 1, as does the process providing the interactive command prompt.
Finally, if DummyModule.jl
is not a separate file, but a package. The using DummyModule
operator loads DummyModule.jl` is included in all processes, but includes the scope only in the process where the operator was called. using
.
Launching and managing workflows
Two types of clusters are supported in the basic Julia installation:
-
a local cluster created using the '-p` parameter, as shown above;
-
A cluster spanning several computers is created using the
--machine-file
parameter. To run Julia workflows (from the path of the current host) on the specified computers, ssh login is used without a password. Each computer is specified in the following form:[count*][user@]host[:port] [bind_addr[:port]]'. By default, `user
is the current user, andport
is the standard SSH port.count
is the number of worker processes spawned on a node (1 by default). The optionalbind-to bind_addr[:port]
element defines the IP address and port that other worker processes should use to connect to this process.
Although backward compatibility is usually provided in Julia, the distribution of code across workflows is done using |
The following functions are available for programmatically adding, deleting, and querying processes in a cluster: addprocs
, rmprocs
, workers
and others.
julia> using Distributed
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
Before the challenge addprocs
module Distributed
must be explicitly loaded into the main process. It becomes available automatically in workflows.
Please note that worker processes do not execute the startup script |
To support other types of clusters, you can write a custom implementation of ClusterManager, as described below in the section ClusterManager objects.
Moving data
In a distributed program, most of the work is done on sending messages and moving data. To ensure high performance and scalability, it is important to minimize the number of messages and the amount of data transferred. To do this, you need to understand how the various distributed programming constructs in Julia move data.
fetch
can be regarded as an explicit data transfer operation, since moving an object to a local computer is requested directly. The macro @spawnat
(and a number of related constructs) also moves data, but in a less obvious way, so this can be called an implicit data movement operation. Let’s consider two approaches to creating a random matrix and squaring it.
Method 1
julia> A = rand(1000,1000);
julia> Bref = @spawnat :any A^2;
[...]
julia> fetch(Bref);
Method 2
julia> Bref = @spawnat :any rand(1000,1000)^2;
[...]
julia> fetch(Bref);
The difference seems trivial, but it actually makes a significant difference due to the behavior of the macro. @spawnat
. In the first method, a random matrix is created locally, and then transferred to another process, where it is squared. In the second method, a random matrix is both created and squared in another process. Therefore, much less data is transmitted in the second case than in the first.
In this simplest example, the difference between the two methods is obvious, so it’s easy to choose the right one. However, in a real-world program, it may be necessary to carefully consider moving data and possibly take measurements. For example, if the matrix A
is needed by the first process, it may be better to use the first method. If calculating the matrix A
requires a lot of resources and only the current process has them, then it will inevitably have to be moved to another process. Finally, if the current process has little to do between calls '@spawnat' and fetch(Bref)', it might be better to abandon parallelism altogether. Let’s also imagine that instead of `rand(1000,1000)"we have a more complicated operation. In this case, it might make sense to add another operator.
@spawnat` exclusively for her.
Global variables
Expressions executed remotely by @spawnat
, or closures assigned to remote execution using remotecall
, can refer to global variables. Global bindings in the Main
module are handled slightly differently than global bindings in other modules. Consider the following code snippet.
A = rand(10,10)
remotecall_fetch(()->sum(A), 2)
In this case, the function sum
MUST be defined in the remote process. Note that A
is a global variable defined in the local workspace. Workflow 2 does not have a variable named A
in Main'. Sending the closure `+()→sum(A)+
to workflow 2 causes it to define Main.A
. Main.A
continues to exist in workflow 2 even after call control returns. remotecall_fetch
. Remote calls with embedded global references (only in the `Main' module) manage global variables as follows.
-
New global variables are created in target workflows if they are accessed as part of a remote call.
-
Global constants are also declared as constants on remote nodes.
-
Global variables are passed back to the target workflow only in the context of a remote call and only if the value changes. In addition, global bindings are not synchronized between cluster nodes. For example:
A = rand(10,10) remotecall_fetch(()->sum(A), 2) # рабочий процесс 2 A = rand(10,10) remotecall_fetch(()->sum(A), 3) # рабочий процесс 3 A = nothing
As a result of the above code snippet, the variables are
Main.A
in workflow 2 andMain.A
in workflow 3 will have different values, andMain.A
on node 1 takes the value `nothing'.
As you may have already guessed, the memory associated with global variables can be cleared when they are reassigned in the main process, but this does not happen in workflows, as the bindings continue to operate. If certain global variables in remote nodes are no longer needed, they can be set to nothing
using the method clear!
. As a result, the associated memory is released during the next garbage collection cycle.
Therefore, global variables should be referenced with caution in remote calls. And it’s even better to avoid it altogether if possible. If it is still necessary to refer to global variables, it is recommended to use let
blocks to localize them.
For example:
julia> A = rand(10,10);
julia> remotecall_fetch(()->A, 2);
julia> B = rand(10,10);
julia> let B = B
remotecall_fetch(()->B, 2)
end;
julia> @fetchfrom 2 InteractiveUtils.varinfo()
name size summary
––––––––– ––––––––– ––––––––––––––––––––––
A 800 bytes 10×10 Array{Float64,2}
Base Module
Core Module
Main Module
As you can see, the global variable A
is defined in workflow 2, but B
is captured as a local variable, and therefore there is no binding for B
in workflow 2.
Parallel mapping and loops
Fortunately, for many useful parallel computations, it is not necessary to move data. A common example is Monte Carlo simulation, for which several processes can simultaneously conduct independent model tests. We can use @spawnat
to flip coins in two processes. First, let’s write the following function in the 'count_heads.jl` file.
function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
The 'count_heads` function simply adds up n
random bits. As follows, you can run tests on two computers, and then add up the results.
julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")
julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)
julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)
julia> fetch(a)+fetch(b)
100001564
This example demonstrates a common and very effective parallel programming pattern. Iterations are performed independently in several processes, and then their results are combined using a function. The process of combining is called reduction, as it usually involves lowering the valence of the tensor: the vector of numbers is reduced to a single number, or the matrix is reduced to a single row or column, etc. In the code, this usually looks like the pattern x = f(x,v[i])
, where x
is the adder, f
is the reduction function, and v[i]
is the reduced elements. It is desirable that the function f
be associative.: This way, the operations can be performed in any order.
Please note that our use case for this pattern in relation to count_heads
can be generalized. We used two explicit operators @spawnat
, so parallelism is limited to two processes. To run in any number of processes, you can use a parallel for loop running in distributed memory. In Julia, it can be written using @distributed
as follows.
nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
This construction implements a pattern of assigning iterations to several processes, followed by combining their results using the specified reduction (in this case, (+)
). The result of each iteration is the value of the last expression inside the loop. As a result of calculating the entire parallel loop expression, the final answer is obtained.
Keep in mind that although parallel for loops are similar to sequential ones, their behavior is fundamentally different. In particular, iterations do not occur in a specific order, and values assigned to variables or arrays will not be available globally, since iterations are performed in different processes. The variables used inside the parallel loop are copied and transferred to each process.
For example, the following code will not work properly.
a = zeros(100000)
@distributed for i = 1:100000
a[i] = i
end
This code does not initialize all the elements of a
, as there will be a separate copy in each process. Such parallel for loops should be avoided. Fortunately, this limitation can be circumvented using shared arrays.
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
Using external variables in parallel loops is perfectly justified if the variables are read-only.
a = randn(1000)
@distributed (+) for i = 1:100000
f(a[rand(1:end)])
end
Here, in each iteration, the function f
is applied to a randomly selected element of the vector a
, which is shared by all processes.
As you can see, the reduction operator can be omitted if it is not needed. In this case, the loop runs asynchronously, that is, it generates independent tasks in all available workflows and immediately returns an array of objects. Future
, without waiting for the execution to complete. The caller can wait for the objects to be completed. Future
anywhere further, calling for them fetch
, or wait for execution to complete at the end of the loop, for which you should specify before it @sync
, for example, `@sync @distributed for'.
In some cases, the reduction operator is not needed if we just want to apply the function to all integers in a certain range (or more generally to all elements of some collection). This is another useful operation called parallel matching. It is implemented in Julia through the function pmap
. For example, you can calculate singular values for several large random matrices in parallel mode as follows.
julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
julia> pmap(svdvals, M);
Function 'pmap` in Julia is designed for cases where each function call performs a large amount of work. On the contrary, @distributed for
is suitable for situations where each iteration is very simple, even to the point where two numbers are simply added together. For parallel computing using pmap
and @distributed for
use only workflows. In the case of @distributed for
, the final reduction is performed in the calling process.
Deleted links and AbstractChannel objects
Deleted links are always associated with the implementation of the `AbstractChannel'.
In a specific implementation of AbstractChannel
(for example, Channel
), the methods must be implemented put!
, take!
, fetch
, isready
and wait
. The deleted object that is referenced Future
, stored in Channel{Any}(1)
, that is, in a channel of size 1 for objects of type `Any'.
An object RemoteChannel'
, which supports overwriting, can point to channels of any type and size, or any other implementation of `AbstractChannel.
The RemoteChannel(f) constructor::Function, pid)()
allows you to create links to channels containing more than one value of a certain type. 'f` is a function executed in the 'pid` process. It should return the AbstractChannel
object.
For example, the expression RemoteChannel(()->Channel{Int}(10), pid)
returns a reference to an Int
type channel of size 10. The channel exists in the 'pid` workflow.
Methods put!
, take!
, fetch
, isready
and wait
for the object RemoteChannel
are proxied to the auxiliary storage of the remote process.
Thus, RemoteChannel'
can be used to reference `AbstractChannel objects implemented by the user. A simple example is the following DictChannel
, which uses a dictionary as a remote storage.:
julia> struct DictChannel{T} <: AbstractChannel{T}
d::Dict
cond_take::Threads.Condition # ожидание доступных данных
DictChannel{T}() where {T} = new(Dict(), Threads.Condition())
DictChannel() = DictChannel{Any}()
end
julia> begin
function Base.put!(D::DictChannel, k, v)
@lock D.cond_take begin
D.d[k] = v
notify(D.cond_take)
end
return D
end
function Base.take!(D::DictChannel, k)
@lock D.cond_take begin
v = fetch(D, k)
delete!(D.d, k)
return v
end
end
Base.isready(D::DictChannel) = @lock D.cond_take !isempty(D.d)
Base.isready(D::DictChannel, k) = @lock D.cond_take haskey(D.d, k)
function Base.fetch(D::DictChannel, k)
@lock D.cond_take begin
wait(D, k)
return D.d[k]
end
end
function Base.wait(D::DictChannel, k)
@lock D.cond_take begin
while !isready(D, k)
wait(D.cond_take)
end
end
end
end;
julia> d = DictChannel();
julia> isready(d)
false
julia> put!(d, :k, :v);
julia> isready(d, :k)
true
julia> fetch(d, :k)
:v
julia> wait(d, :k)
julia> take!(d, :k)
:v
julia> isready(d, :k)
false
RemoteChannel channels and objects
-
An object
Channel
is local to the process. Workflow 2 cannot directly referenceChannel
in workflow 3 (and vice versa). However `RemoteChannel' can send and receive values between workflows. -
An object
RemoteChannel
can be represented as an object descriptorChannel
. -
The process ID
pid
associated withRemoteChannel'
, defines a process in which an auxiliary storage exists, that is, an auxiliary object `Channel. -
Any process with a link to 'RemoteChannel' can send items to and receive them from the channel. Data is automatically sent to (or retrieved from) the process that the object is associated with.
RemoteChannel
. -
When serializing a channel
Channel
also serializes all the data in it. Therefore, when deserializing it, a copy of the original object is created. -
On the contrary, when serializing an object
RemoteChannel
only the identifier defining the location and instance is serializedChannel
referenced by the descriptor. Therefore, the deserialized object `RemoteChannel' (in any workflow) points to the same auxiliary storage as the original one.
The above example of channels can be changed so that interprocess communication takes place, as shown below.
To process a single remote jobs channel, four workflows are started. Jobs identified by job_id
are written to the channel. In this example, each remotely executed task reads the job_id
, waits for a random amount of time, and writes a tuple consisting of the job_id
, the timeout, and its own pid
to the results channel. Finally, the main process outputs all the data from the results
channel.
julia> addprocs(4); # Добавляем рабочие процессы
julia> const jobs = RemoteChannel(()->Channel{Int}(32));
julia> const results = RemoteChannel(()->Channel{Tuple}(32));
julia> @everywhere function do_work(jobs, results) # Определяем рабочую функцию через everywhere
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # Имитирует время, затрачиваемое на реальную работу
put!(results, (job_id, exec_time, myid()))
end
end
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # Подаем в канал jobs n заданий
julia> for p in workers() # Запускаем в рабочих процессах задачи для параллельной обработки запросов
remote_do(do_work, p, jobs, results)
end
julia> @elapsed while n > 0 # Выводим результаты
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741
Deleted links and distributed garbage collection
Objects pointed to by deleted links can only be cleaned up after deleting all linked links in the cluster.
The node where the value is stored monitors the worker processes that refer to it. Every time the object RemoteChannel'
or `Future (whose receipt has been canceled) is serialized into the workflow, and the node to which the link points receives a notification. The node that owns the value also receives a notification every time the object RemoteChannel'
or `Future (whose receipt has been canceled) is cleared by the garbage collector. These actions are implemented in an internal serializer with cluster support. Deleted links are valid only in the context of cluster execution. Serialization of references into regular 'IO` objects and deserialization from them are not supported.
Tracking messages are sent for notification: a message about adding a link when the link is serialized to another process, and a message about deleting the link when the link is cleaned up locally by the garbage collector.
Since the objects Future
are written once and cached locally when applied fetch
to Future
also updates information about tracking links on the node to which the value belongs.
The node that owns the value deletes it after clearing all references to it.
In the case of objects Future
, serialization of the already received Future
is also accompanied by the transfer of the value to another node, since the value in the original remote storage can already be cleared by the collector by this time.
It is important to note that which object is cleaned up by the garbage collector locally depends on the size of the object and the current memory availability on the system.
In the case of remote links, the size of the local link object is quite small, while the value stored in the remote node can be quite large. Since the local object may not be cleaned up by the collector immediately, it is recommended to explicitly call finalize
for local instances RemoteChannel'
or for objects `Future, whose receipt has been canceled. Since when calling fetch
for the object Future
also removes its reference from the remote storage, do this for the received objects. Future
is not required. An explicit challenge finalize
causes an immediate message to be sent to the remote node stating that the reference to the value needs to be deleted.
After elimination, the link becomes invalid and cannot be used in subsequent calls.
Local calls
The data is inevitably copied to the remote call for execution. This happens both during remote calls and when data is saved in RemoteChannel'
or `Future in another node. As expected, this leads to the formation of copies of the serialized objects in the remote node. However, when the destination node is a local node, that is, the ID of the calling process matches the ID of the remote node, the call is executed as local. It is usually (but not always) executed in another task, but there is no serialization or deserialization of the data. Therefore, the call refers to the same instances of objects that are being passed, but no copies are created. This behavior is demonstrated below.
julia> using Distributed;
julia> rc = RemoteChannel(()->Channel(3)); # RemoteChannel создается в локальном узле
julia> v = [0];
julia> for i in 1:3
v[1] = i # Используем `v` повторно
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[3], [3], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1
julia> addprocs(1);
julia> rc = RemoteChannel(()->Channel(3), workers()[1]); # RemoteChannel is created in the remote node
julia> v = [0];
julia> for i in 1:3
v[1] = i
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[1], [2], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3
As can be seen from the example, method calls put!
for the local channel RemoteChannel
when the object v
is changed between calls, the same instance of the object is saved. When the node that owns rc
is different, copies of v
are created.
It should be noted that, as a rule, there is no problem in this. This should only be taken into account if the object is saved locally and at the same time changed after the call. In such cases, you may need to keep a `deepcopy' of the object.
This is also true for remote calls on the local node, as shown in the following example.
julia> using Distributed; addprocs(1);
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v); # Выполняется в локальном узле
julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Running on a remote node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false
As you can see in this case, a remote call to a local node works the same way as a direct call. It modifies the local objects passed as arguments. When making a remote call, operations are performed with a copy of the arguments.
Once again, we repeat that, as a rule, there is no problem in this. If the local node is also used as a computing node and the arguments are used after the call, this feature should be taken into account and, if necessary, pass deep copies of the arguments to the call made in the local node. Calls to remote nodes always work with copies of arguments.
Shared arrays
Shared arrays use the shared memory of the system to map the same array across multiple processes. SharedArray
is a good choice when a large amount of data needs to be accessed simultaneously by two processes or more on the same computer. Support for shared arrays is provided by the SharedArrays
module, which must be explicitly loaded into all participating processes.
An additional data structure is provided by an external package https://github.com/JuliaParallel/DistributedArrays.jl [DistributedArrays.jl
] as a DArray'. Although in some ways the object is similar to `SharedArray
, behavior https://github.com/JuliaParallel/DistributedArrays.jl [DArray
] is different in many ways. In the case of SharedArray
each process working with it has access to the entire array. On the contrary, in the case of an object https://github.com/JuliaParallel/DistributedArrays.jl [DArray
] each process has local access to only a part of the data, and these parts are different for all processes.
SharedArray
is indexed (for the purpose of assigning values and accessing them) just like a regular array, and this happens efficiently, since the corresponding memory area is available to the local process. Therefore, most algorithms naturally work with SharedArray
, albeit in single-process mode. If the algorithm requires an input array Array
, the base array can be extracted from SharedArray
by calling the function sdata
. For other types of AbstractArray' function 'sdata
just returns the object itself, so sdata
can be safely applied to any object of the Array
type.
The constructor of the shared array looks like this.
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
It creates an N
-dimensional shared array of bit type T
and size dims
, accessible to the processes specified in pids
. Unlike distributed arrays, the shared array is accessible only from the participating processes specified in the named argument pids' (as well as from the process where it is created if this process is running on the same host). Note that only elements of types are supported in SharedArray. `isbits
.
If the init
function is specified with the signature initfn(S::SharedArray)
, it is called in all participating workflows. You can specify that each workflow should execute the init
function for a separate part of the array in order to parallelize initialization.
Here is a small example.
julia> using Distributed
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> @everywhere using SharedArrays
julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
Function SharedArrays.localindices
provides separate one-dimensional ranges of indexes and is sometimes useful for dividing tasks between processes. Of course, you can divide the work as you like.
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
Since the underlying data is available to all processes, it is important to ensure that there are no conflicts. For example:
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
will result in undefined behavior. Since each process fills the entire array with its own pid
, the result will be the pid
of the process that was executed last (for any S
element).
To consider a more complex example, let’s run the following kernel in parallel.
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
In this case, if we try to divide the work using a one-dimensional index, trouble is likely to arise: if q[i,j,t]
is close to the end of the block assigned to one workflow, and q[i,j,t+1]
is close to the beginning of the block. assigned to another process, there is a high probability that the part of q[i,j,t]
will not be ready by the time it is needed to calculate q[i,j,t+1]
. In such cases, it is better to divide the array into parts manually. Let’s divide the array by the second dimension. Define a function that returns the indexes (irange, jrange)
assigned to this workflow.
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # Этому рабочему процессу часть не назначена.
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
Next, we define the core.
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # Выведем на экран, чтобы знать, что происходит.
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
We will also define an auxiliary wrapper for the implementation of `SharedArray'.
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
Now let’s compare the three different versions. Running in the same process:
julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
Using @distributed
:
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @distributed for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
Dividing the work into parts:
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
If we create SharedArray
arrays and measure the running time of these functions, we will get the following results (when using the julia -p 4
command).
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
Let’s execute the functions once to perform JIT compilation, and then apply the macro @time
on the second execution.
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
The main advantage of `advection_shared! The reason is that the traffic between the workflows is minimized, so that each process can work with the assigned part for a long time.
Shared arrays and distributed garbage collection
Just like deleted references, shared arrays rely on the garbage collector at the node where they are created to clean up references in all participating workflows. In code that creates many short-term shared arrays, it is advisable to explicitly eliminate these objects as soon as possible. This will make it faster to clear the memory and file descriptors associated with the shared segment.
ClusterManager Objects
Julia processes are started, managed, and combined into a logical cluster using cluster managers. The ClusterManager is responsible for the following actions:
-
running workflows in a cluster environment;
-
event management during the lifetime of each workflow;
-
ensuring data transfer if necessary.
The cluster in Julia has the following features.
-
The initial Julia process, which is also called the master, is special and has an
id
of 1. -
Only the
master
process can add or remove worker processes. -
All processes can interact directly with each other.
Connections between workflows (using the built-in TCP/IP transport) are established as follows.
количество рабочих процессов на соответствующих компьютерах.
-
Each worker process starts listening on a free port and writes information about its host and port to the stream.
stdout
. -
The cluster manager captures the stream 'stdout` of each workflow and makes it available to the main process.
-
The main process analyzes this information and establishes TCP/IP connections to each workflow.
-
Each workflow is also notified about other workflows in the cluster.
-
Each workflow connects to all workflows with an
id
less than theid
of this workflow. -
This creates a multiconnected network in which each workflow is directly connected to all other workflows.
Although by default, the transport layer uses a regular socket. TCPSocket
, the Julia cluster can provide its own transport.
Julia has two built-in cluster managers.:
-
The
LocalManager
is used when callingaddprocs()
oraddprocs(np::Integer)
; -
'SSHManager` is used when calling
addprocs(hostnames::Array)
with a list of host names.
The LocalManager
is used to run additional workflows on the same host, allowing the use of multi-core and multiprocessor hardware.
- Thus, the cluster manager must meet at least the following requirements
addprocs(manager::FooManager)
requires the implementation of the following methods in the FooManager
.
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
For example, let’s see how the LocalManager
manager is implemented, which is responsible for running workflows on the same host.
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
Method launch
accepts the following arguments:
Method launch
is called asynchronously in a separate task. Completion of this task signals that all requested workflows are running. Therefore, exiting the function launch
SHOULD be performed as soon as all requested workflows are running.
The running workflows are connected to each other and to the main process according to the "all to all" scheme. If you specify the command line argument --worker[=<cookie>]
, the running processes will initialize themselves as workers and establish connections via TCP/IP sockets.
All the workflows in the cluster use the same cookie file, which is the main process. If the cookie is not specified (when using the '--worker` option), the workflow tries to read it from the standard input stream. The LocalManager
and SSHManager
pass the cookie to the new workflows through their standard input streams.
By default, the workflow listens on a free port at the address returned as a result of the call. getipaddr()
. Using the optional --bind-to bind_addr[:port]
argument, you can specify a specific listening address. This can be useful if the host is connected to multiple physical data lines.
An example of a non-TCP/IP transport in an implementation is MPI. In this case, the --worker
parameter CANNOT be specified. Instead, new worker processes should call init_worker(cookie)
before using any parallel constructs.
For each running workflow, the method launch
must add a WorkerConfig
object to launched
(with initialization of the corresponding fields).
mutable struct WorkerConfig
# Поля, общие для всех диспетчеров кластеров
io::Union{IO, Nothing}
host::Union{AbstractString, Nothing}
port::Union{Integer, Nothing}
# Используется при запуске дополнительных рабочих процессов на хосте
count::Union{Int, Symbol, Nothing}
exename::Union{AbstractString, Cmd, Nothing}
exeflags::Union{Cmd, Nothing}
# Внешние диспетчеры кластеров могут использовать этот объект для хранения информации на уровне отдельного рабочего процесса
# Для хранения нескольких полей можно использовать словарь.
userdata::Any
# Подключения к рабочим процессам через SSHManager или туннель SSH
tunnel::Union{Bool, Nothing}
bind_addr::Union{AbstractString, Nothing}
sshflags::Union{Cmd, Nothing}
max_parallel::Union{Integer, Nothing}
# Используется локальными диспетчерами или диспетчерами SSH
connect_at::Any
[...]
end
Most of the fields in the WorkerConfig
are used by the built-in dispatchers. User cluster managers usually specify only io
or host
and `port'.
-
If the
io
field is set, information about the host and port is read from it. The Julia workflow outputs its binding address and port at startup. This allows Julia workflows to listen on any available port instead of manually configuring ports for workflows. -
If the
io
field is not specified, thehost
andport
fields are used for connection. -
The fields
count
,exename
andexeflags
are required to run additional workflows from this one. For example, a cluster manager can start one workflow on a node and use it to run additional workflows.-
If the
count
field has an integer value ofn
, a total ofn
worker processes are started. -
If the
count
field has the value:auto
, the number of running workflows is equal to the number of CPU threads (logical cores) on the computer. -
The 'exename` field contains the name of the
julia
executable file, including the full path. -
The
exeflags
field specifies the command line arguments with which the workflows should be started.
-
-
The fields
tunnel
,bind_addr', `sshflags
and `max_parallel' are used when an SSH tunnel is required to connect to workflows from the main one. -
userdata
is provided for user cluster managers to store their own workflow information.
The manage(manager' method::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
is called at different points during the existence of the workflow with the corresponding values of op
:
-
With the value
:register
or:deregister
when adding or removing a process in the Julia workflow pool. -
With the value
:interrupt
when calling `interrupt(workers)'. The ClusterManager must send an interrupt signal to the corresponding workflow.; -
With the value
:finalize
for cleaning purposes.
Cluster managers with custom transports
It takes a little more effort to replace the default all-to-all connections via TCP/IP sockets with a custom transport layer. The number of interaction tasks for the Julia process is equal to the number of workflows it is connected to. For example, consider a Julia cluster with 32 processes in an all-to-all multiconnected network.
-
Thus, each Julia process has 31 interaction tasks.
-
Each task processes all incoming messages from a single remote workflow through a message processing loop.
-
The message processing loop is waiting for the
IO
object (for example,TCPSocket
in the default implementation), reads the entire message, processes it, and waits for the next one. -
Messages are sent to the process directly from any Julia task — not just from interaction tasks — again via the corresponding 'IO` object.
The new implementation, replacing the default transport, should establish connections to remote workflows and provide appropriate IO
objects that message processing cycles can expect. Implementations require the following dispatcher-related callbacks.
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
The default implementation (using TCP/IP sockets) looks like connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)
.
The 'connect` method should return a pair of IO
objects: one of them is designed to read data sent from the pid
workflow, and the other is designed to write data sent to the pid
workflow. User cluster managers can use the BufferStream object in memory as an intermediary for data transfer between user transport, possibly without IO objects, and the built-in Julia infrastructure for parallel processing.
A `BufferStream' is an object placed in memory. 'IOBuffer', which works as `IO': it is a stream that can be processed asynchronously.
In the folder clustermanager/0mq
in https://github.com/JuliaAttic/Examples [Examples repositories] there is an example of using ZeroMQ to connect Julia using the zvezda topology with a 0MQ broker in the center. Note. All Julia processes are still logically connected to each other — any one of them can send messages to any other directly, without taking into account that 0MQ is used as the transport layer.
When using custom transports, consider the following features.
-
Julia workflows should NOT be started with the '--worker` parameter. When started with the '--worker` parameter, new worker processes use TCP/IP socket-based transport by default.
-
For each incoming logical connection to the workflow, you must call
Base.process_messages(rd::IO, wr::IO)()'. As a result, a new task is launched that is responsible for reading and writing messages that are sent to and from the workflow and represented by `IO
objects. -
As part of the workflow initialization, it is not necessary to call
init_worker(cookie, manager::FooManager)
. -
The
connect_at::Any
field inWorkerConfig
can be set by the cluster manager when callinglaunch
. The value of this field is passed in all callbacks.connect
. As a rule, it contains information about how to connect to the workflow. For example, TCP/IP socket-based transport uses this field to specify the tuple(host, port)
, that is, the host and port to connect to the workflow.
To delete a workflow from the cluster, the kill(manager, pid, config)
method is called. For proper cleaning, the corresponding IO
objects must be closed in the main process. The default implementation simply executes the exit()
call for the specified remote workflow.
There is an example in the clustermanager/simple
folder in the Examples repository that demonstrates a simple implementation using UNIX domain sockets to configure a cluster.
Network requirements for LocalManager and SSHManager
Julia clusters are designed to work in already secure environments based on infrastructure such as on-premises computers, department clusters, or even the cloud. This section is dedicated to the network security requirements for the built-in dispatchers LocalManager
and `SSHManager'.
-
The main process is not listening on any port. It just connects to the workflows.
-
Each workflow binds to only one local interface and listens to a temporary port, the number of which is assigned by the operating system.
-
The 'LocalManager` manager used by the 'addprocs(N)` function is bound to the loopback interface only by default. This means that worker processes started later on remote hosts (possibly by an attacker) cannot connect to the cluster. If you call
addprocs(["remote_host"])
afteraddprocs(4)
, it will fail. Sometimes users may need to create a cluster consisting of their local system and several remote systems. To do this, you can explicitly request binding of theLocalManager
to an external network interface using the named argumentrestrict
:addprocs(4; restrict=false)
. -
The 'SSHManager` manager, used by the
addprocs(list_of_remote_hosts)
function, runs workflows on remote hosts via SSH protocol. By default, SSH protocol is used only for running Julia workflows. For further communication between the worker processes and between them and the main process, regular TCP/IP sockets without encryption are used. Password-free login must be enabled on remote hosts. Additional SSH flags or credentials can be specified using the namedsshflags
argument. -
Calling
addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)
is useful if SSH connections also need to be used for interaction between the main and worker processes. A typical example is when the Julia REPL environment (i.e., the main process) runs on a local computer, while the rest of the cluster runs in a cloud such as Amazon EC2. In this case, only port 22 needs to be opened in the remote cluster, and the SSH client is authenticated using the Public Key Infrastructure (PKI). Credentials can be provided usingsshflags
, for example $sshflags=-i <keyfile>
$.In the "all to all" topology (by default), all worker processes connect to each other via regular TCP sockets. Thus, the security policy on the cluster nodes should ensure that it is possible to freely connect between workflows using a range of temporary ports (depending on the OS).
The protection and encryption of all traffic between workflows (via SSH) or the encryption of individual messages can be performed by the user dispatcher
ClusterManager
. -
If the function
multiplex=true
is called with the parameter 'addprocs`, SSH multiplexing creates a tunnel between the main and worker processes. If you have configured SSH multiplexing yourself and the connection is already established, SSH multiplexing is used regardless of the value of themultiplex
parameter. If multiplexing is enabled, forwarding is configured using an existing connection (the-O forward
option in SSH). This can be useful if your servers require password-based authentication: Julia authentication can be avoided by logging in to the server before callingaddprocs
. The control socket will be located in~/.ssh/julia-%r@%h:%p
during the session, unless an existing multiplexed connection is used. Please note: If you create multiple processes on a node and enable multiplexing, the bandwidth may be limited, as in this case the processes share a single TCP connection with multiplexing.
Cluster cookie
All the worker processes in the cluster use the same cookie, which by default is a randomly generated string in the main process.
-
cluster_cookie()
returns the cookie, whilecluster_cookie(cookie)()
sets
его и возвращает новый файл cookie.
-
All connections are authenticated on both sides so that only worker processes started by the main process can connect to each other.
-
A cookie can be passed to workflows when they are started using the
--worker=<cookie>
argument. If the--worker
argument is used without specifying a cookie, the workflow tries to read the cookie from the standard input stream (stdin
). After receiving the cookie, thestdin
stream is immediately closed. -
ClusterManager can receive a cookie file from the main process by calling the method
cluster_cookie()
. Cluster managers that do not use TCP/IP transport by default (and therefore do not specify the--worker
parameter) should callinit_worker(cookie, manager)
with the cookie file of the main process.
Please note that in environments with higher security requirements, a custom implementation of `ClusterManager' may be used. For example, cookies may be provided in advance and therefore not specified as an argument at startup.
Specifying the network topology (experimental function)
Named argument topology' passed to the function 'addprocs
, defines how workflows should connect to each other:
-
`:all_to_all' (default value) — all worker processes connect to each other;
-
:master_worker' — only the main process, i.e. the process with `pid
1, connects to the worker processes; -
:custom
— thelaunch
method of the cluster manager sets the connection topology using theident
andconnect_idents
fields in theWorkerConfig'. A worker process with the identifier `ident
provided by the cluster manager connects to all the worker processes specified inconnect_idents
.
The named argument lazy=true|false
is valid only if the topology
parameter has the value :all_to_all'. If the value is set to `true
, immediately after the cluster is started, the main process is connected to all workers. Connections between specific workflows are established on the first remote call between them. This helps to reduce the amount of resources allocated initially for interaction within the cluster. Connections are established depending on the needs of the parallel program at runtime. The default value for the `lazy' argument is `true'.
Currently, sending a message between unconnected worker processes results in an error. This functionality and its associated interface should be considered experimental in nature and may change in future releases.
Important external packages
In addition to implementing parallelism in Julia, there are many external packages worth mentioning. For example, https://github.com/JuliaParallel/MPI.jl [MPI.jl
] is the Julia wrapper for the MPI protocol. https://github.com/JuliaParallel/Dagger.jl [Dagger.jl
] provides functionality similar to the library https://dask.org /[Dask] in Python, and https://github.com/JuliaParallel/Distributedarrays.jl [DistributedArrays.jl
] supports distributed array operations in multiple workflows, as already described. above.
It is also necessary to mention the Julia GPU programming ecosystem, which includes the following components.
-
CUDA.jl includes various CUDA libraries and supports compilation of Julia cores for Nvidia GPUs.
-
oneAPI.jl serves as a wrapper for the unified oneAPI programming model and supports the execution of Julia kernels on supported accelerators. Currently, only Linux is supported.
-
AMDGPU.jl includes AMD ROCm libraries and supports compilation of Julia cores for AMD GPUs. Currently, only Linux is supported.
-
High-level libraries such as https://github.com/JuliaGPU/KernelAbstractions.jl [KernelAbstractions.jl], https://github.com/mcabbott/Tullio.jl [Tullio.jl] and https://github.com/JuliaComputing/ArrayFire.jl [ArrayFire.jl].
In the following example, we use DistributedArrays.jl
and CUDA.jl
to distribute the array across multiple processes. To do this, we will first bring it up using distribute()
and `CuArray()'.
Don’t forget that DistributedArrays.jl
must be imported into all processes using @everywhere
.
$ ./julia -p 4
julia> addprocs()
julia> @everywhere using DistributedArrays
julia> using CUDA
julia> B = ones(10_000) ./ 2;
julia> A = ones(10_000) .* π;
julia> C = 2 .* A ./ B;
julia> all(C .≈ 4*π)
true
julia> typeof(C)
Array{Float64,1}
julia> dB = distribute(B);
julia> dA = distribute(A);
julia> dC = 2 .* dA ./ dB;
julia> all(dC .≈ 4*π)
true
julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}
julia> cuB = CuArray(B);
julia> cuA = CuArray(A);
julia> cuC = 2 .* cuA ./ cuB;
julia> all(cuC .≈ 4*π);
true
julia> typeof(cuC)
CuArray{Float64,1}
In the following example, we use DistributedArrays.jl
and CUDA.jl
to distribute an array across multiple processes and call a universal function for it.
function power_method(M, v)
for i in 1:100
v = M*v
v /= norm(v)
end
return v, norm(M*v) / norm(v) # или (M*v) ./ v
end
'power_method` systematically creates vectors and normalizes them. We did not specify the type signature in the function declaration. Let’s see if this works for the previously mentioned data types.
julia> M = [2. 1; 1 1];
julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877
julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)
julia> cuM = CuArray(M);
julia> cuv = CuArray(v);
julia> curesult = power_method(cuM, cuv);
julia> typeof(curesult)
CuArray{Float64,1}
julia> dM = distribute(M);
julia> dv = distribute(v);
julia> dC = power_method(dM, dv);
julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}
To conclude this brief overview of external packages, consider MPI.jl
, the Julia wrapper for the MPI protocol. It would take too long to review all the internal functions, so it would be better to just evaluate the protocol implementation approach.
Let’s consider a simple script that calls each subprocess, creates an instance of its rank, and summarizes the ranks when the main process is reached.
import MPI
MPI.Init()
comm = MPI.COMM_WORLD
MPI.Barrier(comm)
root = 0
r = MPI.Comm_rank(comm)
sr = MPI.Reduce(r, MPI.SUM, root, comm)
if(MPI.Comm_rank(comm) == root)
@printf("sum of ranks: %s\n", sr)
end
MPI.Finalize()
mpirun -np 4 ./julia example.jl