Tasks completed in weeks 11 to 12
- Implemented lock-free parallel Breadth-First Search with dynamic load balancing.
- Improved sequential PageRank.
- Implemented parallel random heuristics.
- Created a poster for my GSoC project.
Details
1. Lock-free parallel BFS with dynamic load balancing
Refer to the previous post titled “Weeks 9 to 10”, sections 3, 4, 5. This section is not intended to make sense without it.
I wrote a parallel version of breadth first search that uses dynamic load balancing.
The current level is partitioned into nthreads()
vectors named cur_level_t
. The thread number thread_id
explores the outneighbours
of the vertices in the partition cur_level_t[thread_id]
. When a thread thread_id
has finished working on cur_level_t[thread_id]
, it should search for a thread t
that has not finished iterating cur_level_t[t]
and steal some vertices from cur_level_t[t]
.
# Obtain equal sized partitions of sources and place partition
# i into queue_list[i].
function partition_sources!(
queue_list::Vector{Vector{T}},
sources::Vector{<:Integer},
empty_list::Vector{Bool}
) where T<:Integer
partitions = LightGraphs.unweighted_contiguous_partition(length(sources), length(queue_list))
for (i, p) in enumerate(partitions)
append!(queue_list[i], sources[p])
empty_list[i] = isempty(p)
end
end
function dynamic_parallel_gdistances!(
g::AbstractGraph{T},
sources::Vector{<:Integer},
vert_level::Vector{T};
queue_segment_size::Integer=20
) where T <:Integer
nvg = nv(g)
n_t = nthreads()
segment_size = convert(T, queue_segment_size) # Type stability
fill!(vert_level, typemax(T))
visited = zeros(Bool, nvg)
# Level queues
next_level_t = [sizehint!(Vector{T}(), cld(nvg, n_t)) for _ in Base.OneTo(n_t)]
cur_level_t = [sizehint!(Vector{T}(), cld(nvg, n_t)) for _ in Base.OneTo(n_t)]
cur_front_t = ones(T, n_t)
queue_explored_t = zeros(Bool, n_t)
for s in sources
visited[s] = true
vert_level[s] = zero(T)
end
partition_sources!(cur_level_t, sources, queue_explored_t)
is_cur_level_t_empty = isempty(sources)
n_level = zero(T)
while !is_cur_level_t_empty
n_level += one(T)
@threads for thread_id in Base.OneTo(n_t)
#Explore current level in parallel
@inbounds next_level = next_level_t[thread_id]
@inbounds for t_range in (thread_id:n_t, 1:(thread_id-1)), t in t_range
queue_explored_t[t] && continue
cur_level = cur_level_t[t]
cur_len = length(cur_level)
# Explore cur_level_t[t] one segment at a time.
while true
local_front = cur_front_t[t] # Data race, but first read always succeeds
cur_front_t[t] += segment_size # Failure of increment is acceptable
(local_front > cur_len || local_front <= zero(T)) && break
while local_front <= cur_len && cur_level[local_front] != zero(T)
v = cur_level[local_front]
cur_level[local_front] = zero(T)
local_front += one(T)
# Check if v was successfully read.
(visited[v] && vert_level[v] == n_level-one(T)) || continue
for i in outneighbors(g, v)
# Data race, but first read on visited[i] always succeeds
if !visited[i]
vert_level[i] = n_level
#Concurrent visited[i] = true always succeeds
visited[i] = true
push!(next_level, i)
end
end
end
end
queue_explored_t[t] = true
end
end
is_cur_level_t_empty = true
@inbounds for t in Base.OneTo(n_t)
cur_level_t[t], next_level_t[t] = next_level_t[t], cur_level_t[t]
cur_front_t[t] = one(T)
empty!(next_level_t[t])
queue_explored_t[t] = isempty(cur_level_t[t])
is_cur_level_t_empty = is_cur_level_t_empty && queue_explored_t[t]
end
end
return vert_level
end
In the above implementation, each thread reads v = cur_level_t[t]
and writes
cur_level_t[t] = zero(T)
.
cur_front_t[t]
is the left-most index of cur_level_t[t]
that has not been read.
Each thread increments local_front = cur_front_t[t]
by queue_segment_size
so that the next read to cur_front_t[t]
cannot claim read the vertices in indices
local_front:local_front+queue_segment_size
.
1. Data races on visited[t]
This is the same as the data races on visited in parallel BFS with static load balancing.
2. Data race on cur_front_t[t]
Multiple threads:
local_front = cur_front_t[t]
cur_front_t[t] += segment_size
The above code can cause a data race on cur_front_t[t]
.
It is easy to see that if at least one thread reads cur_front_t[t]
as one(T)
then all the vertices in cur_level_t[t]
will be explored.
As a direct consequence of assumption I
that this will happen.
3. Data race on cur_level_t[t]
Thread 1: v = cur_level_t[local_front]
Thread 2: cur_level_t[local_front] = 0
The above code can cause a data race on cur_level_t[local_front]
.
As a direct consequence of assumption I
, we know that as at least on thread will
successfuly perform v = cur_level_t[local_front]
.
Now we only have to worry about corrupted read that causes .
v' = cur_level_t[local_front]
vert_level[v'] = n_level
This is avoided by the statement:
(visited[v] && vert_level[v] == local_n_level-one(T)) || continue
If (visited[v] && vert_level[v] == local_n_level-one(T))
is true
then the following statements must have been completed in the previous level.
if !visited[i]
vert_level[i] = local_n_level
visited[i] = true
Due to assumption III
, it is not possible multiple thread writes could have caused a corrupted
read on both vert_level[i]
and visited[i]
simultaneously.
3. Data race on vert_level[i]
Mutliple threads: vert_level[i] = local_n_level
The above code can cause concurrent write/write data race. As a consequence of
assumption II
, vert_level
should have the correct value.
Benchmarks
nthreads() = 4
sources = [1,]
g = random_regular_graph(200000, 400)
(|V| = 200000, |E| = 40000000)
gdistances(g, sources)
: 136.752 ms (7 allocations: 3.08 MiB
static_parallel_distances(g, sources)
: 87.601 ms (24 allocations: 9.20 MiB)
dynamic_parallel_distances(g, sources)
: 79.350 ms (32 allocations: 12.26 MiB)
g = loadsnap(:ego_twitter_u)
(|V| = 81306, |E| = 1342310)
gdistances(g, sources)
: 13.741 ms (6 allocations: 1.25 MiB)
static_parallel_distances(g, sources)
: 13.046 ms (26 allocations: 3.88 MiB)
dynamic_parallel_distances(g, sources)
: 7.362 ms (33 allocations: 4.98 MiB)
g = loadsnap(:ca_astroph)
(|V| = 17903, |E| = 197031)
gdistances(g, sources)
: 2.544 ms (6 allocations: 282.38 KiB)
static_parallel_distances(g, sources)
: 2.466 ms (29 allocations: 875.94 KiB)
dynamic_parallel_distances(g, sources)
: 1.206 ms (34 allocations: 1.11 MiB)
g = loadsnap(:facebook_combined)
(|V| = 4039, |E| = 88234)
gdistances(g, sources)
: 412.740 μs (6 allocations: 64.05 KiB)
static_parallel_distances(g, sources)
: 343.053 μs (24 allocations: 198.97 KiB)
dynamic_parallel_distances(g, sources)
: 194.763 μs (30 allocations: 258.70 KiB)
2. Improved Sequential PageRank
As explained in the post titled “Weeks 5 to 6”, the pagerank code is of the form:
function old_pagerank(g::AbstractGraph, α, n, ϵ)
x = fill(1/nv(g), nv(g))
x_last = fill(1/nv(g), nv(g))
for _ in 1:num_iterations
DO SOMETHING
for e in edges(g)
u, v = src(e), dst(e)
xlast[v] += α * x[u] / outdegree(g, u)
if !is_directed(g)
xlast[u] += α * x[v] / outdegree(g, v)
end
end
DO SOMETHING
end
end
I replaced the inner loop to iterate over the in-edges.
function pagerank(g::AbstractGraph, α, n, ϵ)
x = fill(1/nv(g), nv(g))
x_last = fill(1/nv(g), nv(g))
for _ in 1:num_iterations
DO SOMETHING
for v in vertices(g)
for u in inneighbors(g, v)
xlast[v] += α * x[u] / outdegree(g, u)
end
end
DO SOMETHING
end
end
Although the same amount of “work” (number of operations) is being done, this implementation conducts memory operations more cache-efficiently.
Benchmarks
nthreads() = 4
g = loadsnap(:ego_twitter_u)
(|V| = 81306, |E| = 1342310)
old_pagerank(g)
: 290.723 ms (10 allocations: 1.86 MiB)
pagerank(g)
: 95.063 ms (10 allocations: 1.86 MiB)
parallel_pagerank(g)
: 53.639 ms (21 allocations: 2.48 MiB)
g = loadsnap(:ca_astroph)
(|V| = 17903, |E| = 197031)
old_pagerank(g)
: 53.715 ms (10 allocations: 420.00 KiB)
pagerank(g)
: 15.944 ms (10 allocations: 420.00 KiB)
parallel_pagerank(g)
: 10.485 ms (23 allocations: 560.44 KiB)
g = loadsnap(:facebook_combined)
(|V| = 4039, |E| = 88234)
old_pagerank(g)
: 27.367 ms (10 allocations: 95.06 KiB)
pagerank(g)
: 8.610 ms (10 allocations: 95.06 KiB)
parallel_pagerank(g)
: 5.273 ms (26 allocations: 127.33 KiB)
3. Parallel Random Heuristics
As explained in the post titled “Weeks 1 to 2”, some algorithms will return a random solution to a problem.
Eg. the problem of Maximum Independent Set is to return the largest set of vertices in a graph such that no two vertices are adjacent.
The code maximal_independet_set(g)
will return a random independent set S such that
all vertices not in S are adjacent to a vertex in S.
We will usually makes Reps
calls to random_maximal_independet_set(g)
and return the largest set among them. The repeated calls can be made in parallel since the different calls are isolated from each other.
I wrote 2 function generate_min_set(g, gen_func, Reps)
and generate_min_set(g, gen_func, Reps)
.
generate_min_set(g, gen_func, Reps)
makes Reps
calls to gen_func(g)
and returns the smallest set. You can guess what generate_max_set(g, gen_func, Reps)
does.
The distributed implementation of generate_min_set
be desciebed as:
function distr_generate_min_set(
g::AbstractGraph{T},
gen_func::Function,
Reps::Integer
) where T<: Integer
# Type assert required for type stability
min_set::Vector{T} = @distributed ((x, y)->length(x) \< length(y) ? x : y) for _ in 1:Reps
gen_func(g)
end
return min_set
end
The multi-threaded version of generate_min_set
can be described as:
function threaded_generate_min_set(
g::AbstractGraph{T},
gen_func::Function,
Reps::Integer
) where T<: Integer
n_t = Base.Threads.nthreads()
is_undef = ones(Bool, n_t)
min_sets = [Vector{T}() for _ in 1:n_t]
Base.Threads.@threads for _ in 1:Reps
t = Base.Threads.threadid()
next_set = gen_func(g)
if is_undef[t] || length(next_set) < length(min_sets[t])
min_sets[t] = next_set
is_undef[t] = false
end
end
min_ind = 0
for i in filter((j)->!is_undef[j], 1:n_t)
if min_ind == 0 || length(min_sets[i]) < length(min_sets[min_ind])
min_ind = i
end
end
return min_sets[min_ind]
end
Benchmarks
gen_func = minimal_vertex_cover
nthreads() = 4
nworkers() = 4
Reps_t = 40
Reps_d = 400
g = loadsnap(:ego_twitter_u)
(|V| = 81306, |E| = 1342310)
gen_func(g)
: 68.255 ms (43 allocations: 42.98 MiB)
threaded_generate_min_set(g, gen_func, Reps_t)
: 1.529 s (1753 allocations: 1.66 GiB)
distr_generate_min_set(g, gen_func, Reps_d)
: 14.454 s (419345 allocations: 101.94 MiB)
g = loadsnap(:ca_astroph)
(|V| = 17903, |E| = 197031)
gen_func(g)
: 7.442 ms (39 allocations: 6.52 MiB)
threaded_generate_min_set(g, gen_func, Reps_t)
: 175.882 ms (1590 allocations: 242.64 MiB)
distr_generate_min_set(g, gen_func, Reps_d)
: 2.034 s (86002 allocations: 16.69 MiB)
g = loadsnap(:facebook_combined)
(|V| = 4039, |E| = 88234)
gen_func(g)
: 2.870 ms (35 allocations: 2.82 MiB)
threaded_generate_min_set(g, gen_func, Reps_t)
: 69.671 ms (1362 allocations: 79.12 MiB)
distr_generate_min_set(g, gen_func, Reps_d)
: 721.288 ms (21644 allocations: 7.29 MiB)
4. GSoC Project Poster
Julia’s GSoC students are invited to attend Juliacon 2018 and present our project. We have to create a poster that briefly describes our project and accomplishments. My poster can be found here.