where the functions are convex but not necessarily differentiable. The absolute value function and the 1-norm are typical examples. Important applications also arise from Lagrangian relaxation. The idea of the algorithm is to approximate the functions with piecewise linear models which are built up from information obtained by evaluating at different points. We iteratively minimize over the models to generate candidate solution points.
We can state the algorithm as
Choose starting point .
For , evaluate and update corresponding model .
Let the next
candidate be the minimizer of .
If not converged, goto step 2.
If it is costly to evaluate , then the algorithm is naturally parallelizable at step 2. The minimization in step 3 can be computed by solving a linear optimization problem, which is usually very fast. (Let me point out here that Julia has interfaces to linear programming and other optimization solvers under JuliaOpt.)
Abstracting the math, we can write the algorithm using the following Julia code.
# functions initialize, isconverged, solvesubproblem, and process implemented elsewhere
state, subproblems = initialize()
while !isconverged(state)
results = map(solvesubproblem,subproblems)
state, subproblems = process(state, results)
end
The function solvesubproblem
corresponds to evaluating for a given and (the elements of subproblems
could be tuples (i,x)
). The function process
corresponds to minimizing the model in step 3, and it produces a new state and a new set of subproblems to solve. Note that the algorithm looks much like a map-reduce that would be easy to parallelize using many existing frameworks. Indeed, in Julia we can simply replace map
with pmap
(parallel map). Let's consider a twist that makes the parallelism not so straightforward.
solvesubproblem
function can lead to load imbalance and limit parallel efficiency as workers sit idle waiting for new tasks. Such variability arises naturally if solvesubproblem
itself requires solving a optimization problem, or if the workers and network are shared, as is often the case with cloud computing. We can consider a new variant of the cutting-plane algorithm to address this issue. The key point is
When proportion of subproblems for a given candidate have been solved, generate a new candidate and corresponding set of subproblems by using whatever information is presently available.
In other words, we generate new tasks to feed to workers without needing to wait for all current tasks to complete, making the algorithm asynchronous. The algorithm remains convergent, although the total number of iterations may increase. For more details, see this paper by Jeff Linderoth and Stephen Wright.
By introducing asynchronicity we can no longer use a nice black-box pmap
function and have to dig deeper into the parallel implementation. Fortunately, this is easy to do in Julia.
Julia implements distributed-memory parallelism based on one-sided message passing, where process push work onto others (via remotecall
) and the results are retrieved (via fetch
) by the process which requires them. Macros such as @spawn
and @parallel
provide pretty syntax around this low-level functionality. This model of parallelism is very different from the typical SIMD style of MPI. Both approaches are useful in different contexts, and I expect an MPI wrapper for Julia will appear in the future (see also here).
Reading the manual on parallel computing is highly recommended, and I won't try to reproduce it in this post. Instead, we'll dig into and extend one of the examples it presents.
The implementation of pmap
in Julia is
function pmap(f, lst)
np = nprocs() # determine the number of processors available
n = length(lst)
results = cell(n)
i = 1
# function to produce the next work item from the queue.
# in this case it's just an index.
next_idx() = (idx=i; i+=1; idx)
@sync begin
for p=1:np
if p != myid() || np == 1
@spawnlocal begin
while true
idx = next_idx()
if idx > n
break
end
results[idx] = remotecall_fetch(p, f, lst[idx])
end
end
end
end
end
results
end
On first sight, this code is not particularly intuitive. The @spawnlocal
macro creates a task on the master process (e.g. process 1). Each task feeds work to a corresponding worker; the call remotecall_fetch(p, f, lst[idx])
function calls f
on process p
and returns the result when finished. Tasks are uninterruptable and only surrender control at specific points such as remotecall_fetch
. Tasks cannot directly modify variables from the enclosing scope, but the same effect can be achieved by using the next_idx
function to access and mutate i
. The task idiom functions in place of using a loop to poll for results from each worker process. Implementing our asynchronous algorithm is not much more than a modification of the above code:
# given constants n and 0 < alpha <= 1
# functions initialize and solvesubproblem defined elsewhere
np = nprocs()
state, subproblems = initialize()
converged = false
isconverged() = converged
function updatemodel(mysubproblem, result)
# store result
...
# decide whether to generate new subproblems
state.numback[mysubproblem.parent] += 1
if state.numback[mysubproblem.parent] >= alpha*n && !state.didtrigger[mysubproblem.parent]
state.didtrigger[mysubproblem.parent] = true
# generate newsubproblems by solving linear optimization problem
...
if ... # convergence test
converged = true
else
append!(subproblems, newsubproblems)
push!(state.didtrigger, false)
push!(state.numback, 0)
# ensure that for s in newsubproblems, s.parent == length(state.numback)
end
end
end
@sync begin
for p=1:np
if p != myid() || np == 1
@spawnlocal begin
while !isconverged()
if length(subproblems) == 0
# no more subproblems but haven't converged yet
yield()
continue
end
mysubproblem = shift!(subproblems) # pop subproblem from queue
result = remotecall_fetch(p, solvesubproblem, mysubproblem)
updatemodel(mysubproblem, result)
end
end
end
end
end
where state
is an instance of a type defined as type State
didtrigger::Vector{Bool}
numback::Vector{Int}
...
end
There is little difference in the structure of the code inside the @sync
blocks, and the asynchronous logic is encapsulated in the local updatemodel
function which conditionally generates new subproblems. A strength of Julia is that functions like pmap
are implemented in Julia itself, so that it is particularly straightforward to make modifications like this. solvesubproblem
entails solving a relatively small linear optimization problem. Test instances are taken from the previously mentioned paper. We'll first run on a large multicore server. The runals.jl
(asynchronous L-shaped) file contains the algorithm we'll use. Its usage is
julia runals.jl [data source] [num subproblems] [async param] [block size]
where [num subproblems]
is the
as above and [async param]
is the proportion
. By setting
we obtain the synchronous algorithm. For the asynchronous version we will take
. The [block size]
parameter controls how many subproblems are sent to a worker at once (in the previous code, this value was always 1). We will use 4000 subproblems in our experiments.
To run multiple Julia processes on a shared-memory machine, we pass the -p N
option to the julia
executable, which will start up N
system processes. To execute the asynchronous version with 10 workers, we run
julia -p 12 runals.jl Data/storm 4000 0.6 30
Note that we start 12 processes. These are the 10 workers, the master (which distributes tasks), and another process to perform the master's computations (an additional refinement which was not described above). Results from various runs are presented in the table below.
Synchronous | Asynchronous | |||
No. Workers | Speed | Efficiency | Speed | Efficiency |
10 | 154 | Baseline | 166 | Baseline |
20 | 309 | 100.3% | 348 | 105% |
40 | 517 | 84% | 654 | 98% |
60 | 674 | 73% | 918 | 92% |
There are a few more hoops to jump through in order to run on EC2. First we must build a system image (AMI) with Julia installed. Julia connects to workers over ssh, so I found it useful to put my EC2 ssh key on the AMI and also set StrictHostKeyChecking no
in /etc/ssh/ssh_config
to disable the authenticity prompt when connecting to new workers. Someone will likely correct me on if this is the right approach.
Assuming we have an AMI in place, we can fire up the instances. I used an m3.xlarge instance for the master and m1.medium instances for the workers. (Note: you can save a lot of money by using the spot market.)
To add remote workers on startup, Julia accepts a file with a list of host names through the --machinefile
option. We can generate this easily enough by using the EC2 API Tools (Ubuntu package ec2-api-tools
) with the command
ec2-describe-instances | grep running | awk '{ print $5; }' > mfile
On the master instance we can then run
julia --machinefile mfile runatr.jl Data/storm 4000 0.6 30
Results from various runs are presented in the table below.
Synchronous | Asynchronous | |||
No. Workers | Speed | Efficiency | Speed | Efficiency |
10 | 149 | Baseline | 151 | Baseline |
20 | 289 | 97% | 301 | 99.7% |
40 | 532 | 89% | 602 | 99.5% |
On both architectures the asynchronous version solves subproblems at a higher rate and has significantly better parallel efficiency. Scaling is better on EC2 than on the shared-memory server likely because the subproblem calculation is memory bound, and so performance is better on the distributed-memory architecture. Anyway, with Julia we can easily experiment on both.