Tasks completed in weeks 11 to 12

  1. Implemented lock-free parallel Breadth-First Search with dynamic load balancing.
  2. Improved sequential PageRank.
  3. Implemented parallel random heuristics.
  4. Created a poster for my GSoC project.

Details

1. Lock-free parallel BFS with dynamic load balancing

Refer to the previous post titled “Weeks 9 to 10”, sections 3, 4, 5. This section is not intended to make sense without it.

I wrote a parallel version of breadth first search that uses dynamic load balancing. The current level is partitioned into nthreads() vectors named cur_level_t. The thread number thread_id explores the outneighbours of the vertices in the partition cur_level_t[thread_id]. When a thread thread_id has finished working on cur_level_t[thread_id], it should search for a thread t that has not finished iterating cur_level_t[t] and steal some vertices from cur_level_t[t].

# Obtain equal sized partitions of sources and place partition
# i into queue_list[i].
function partition_sources!(
    queue_list::Vector{Vector{T}},
    sources::Vector{<:Integer},
    empty_list::Vector{Bool}
    ) where T<:Integer
    
    partitions = LightGraphs.unweighted_contiguous_partition(length(sources), length(queue_list))
    for (i, p) in enumerate(partitions)
        append!(queue_list[i], sources[p])
        empty_list[i] = isempty(p)
    end
end

function dynamic_parallel_gdistances!(
    g::AbstractGraph{T}, 
    sources::Vector{<:Integer},
    vert_level::Vector{T};
    queue_segment_size::Integer=20
    ) where T <:Integer
 
    nvg = nv(g)
    n_t = nthreads()
    segment_size = convert(T, queue_segment_size) # Type stability
    fill!(vert_level, typemax(T))
    visited = zeros(Bool, nvg)

    # Level queues
    next_level_t = [sizehint!(Vector{T}(), cld(nvg, n_t)) for _ in Base.OneTo(n_t)]
    cur_level_t = [sizehint!(Vector{T}(), cld(nvg, n_t)) for _ in Base.OneTo(n_t)]
    cur_front_t = ones(T, n_t)
    queue_explored_t = zeros(Bool, n_t)

    for s in sources    
        visited[s] = true
        vert_level[s] = zero(T)
    end

    partition_sources!(cur_level_t, sources, queue_explored_t)
    is_cur_level_t_empty = isempty(sources)
    n_level = zero(T)

    while !is_cur_level_t_empty
        n_level += one(T)

        @threads for thread_id in Base.OneTo(n_t)
            #Explore current level in parallel
            @inbounds next_level = next_level_t[thread_id]

            @inbounds for t_range in (thread_id:n_t, 1:(thread_id-1)), t in t_range
                queue_explored_t[t] && continue
                cur_level = cur_level_t[t]
                cur_len = length(cur_level)

                # Explore cur_level_t[t] one segment at a time.
                while true 
                    local_front = cur_front_t[t]  # Data race, but first read always succeeds
                    cur_front_t[t] += segment_size # Failure of increment is acceptable

                    (local_front > cur_len || local_front <= zero(T)) && break                    
                    while local_front <= cur_len && cur_level[local_front] != zero(T)
                        v = cur_level[local_front]
                        cur_level[local_front] = zero(T)
                        local_front += one(T)

                        # Check if v was successfully read.
                        (visited[v] && vert_level[v] == n_level-one(T)) || continue
                        for i in outneighbors(g, v)
                            # Data race, but first read on visited[i] always succeeds
                            if !visited[i]
                                vert_level[i] = n_level
                                #Concurrent visited[i] = true always succeeds
                                visited[i] = true
                                push!(next_level, i)
                            end
                        end
                    end   
                end
                queue_explored_t[t] = true        
            end      
        end

        is_cur_level_t_empty = true
        @inbounds for t in Base.OneTo(n_t)
            cur_level_t[t], next_level_t[t] = next_level_t[t], cur_level_t[t]
            cur_front_t[t] = one(T)
            empty!(next_level_t[t])
            queue_explored_t[t] = isempty(cur_level_t[t])
            is_cur_level_t_empty = is_cur_level_t_empty && queue_explored_t[t]
        end               
    end

    return vert_level
end

In the above implementation, each thread reads v = cur_level_t[t] and writes cur_level_t[t] = zero(T).

cur_front_t[t] is the left-most index of cur_level_t[t] that has not been read.

Each thread increments local_front = cur_front_t[t] by queue_segment_size so that the next read to cur_front_t[t] cannot claim read the vertices in indices local_front:local_front+queue_segment_size.

1. Data races on visited[t]

This is the same as the data races on visited in parallel BFS with static load balancing.

2. Data race on cur_front_t[t]

Multiple threads: 
local_front = cur_front_t[t]
cur_front_t[t] += segment_size

The above code can cause a data race on cur_front_t[t].

It is easy to see that if at least one thread reads cur_front_t[t] as one(T) then all the vertices in cur_level_t[t] will be explored. As a direct consequence of assumption I that this will happen.

3. Data race on cur_level_t[t]

Thread 1: v = cur_level_t[local_front]

Thread 2: cur_level_t[local_front] = 0

The above code can cause a data race on cur_level_t[local_front]. As a direct consequence of assumption I, we know that as at least on thread will successfuly perform v = cur_level_t[local_front].

Now we only have to worry about corrupted read that causes .

v' = cur_level_t[local_front]
vert_level[v'] = n_level

This is avoided by the statement:

(visited[v] && vert_level[v] == local_n_level-one(T)) || continue

If (visited[v] && vert_level[v] == local_n_level-one(T)) is true then the following statements must have been completed in the previous level.

if !visited[i] 
    vert_level[i] = local_n_level
    visited[i] = true

Due to assumption III, it is not possible multiple thread writes could have caused a corrupted read on both vert_level[i] and visited[i] simultaneously.

3. Data race on vert_level[i]

Mutliple threads: vert_level[i] = local_n_level

The above code can cause concurrent write/write data race. As a consequence of assumption II, vert_level should have the correct value.

Benchmarks


nthreads() = 4

sources = [1,]



g = random_regular_graph(200000, 400) (|V| = 200000, |E| = 40000000)

gdistances(g, sources): 136.752 ms (7 allocations: 3.08 MiB

static_parallel_distances(g, sources): 87.601 ms (24 allocations: 9.20 MiB)

dynamic_parallel_distances(g, sources): 79.350 ms (32 allocations: 12.26 MiB)



g = loadsnap(:ego_twitter_u) (|V| = 81306, |E| = 1342310)

gdistances(g, sources): 13.741 ms (6 allocations: 1.25 MiB)

static_parallel_distances(g, sources): 13.046 ms (26 allocations: 3.88 MiB)

dynamic_parallel_distances(g, sources): 7.362 ms (33 allocations: 4.98 MiB)



g = loadsnap(:ca_astroph) (|V| = 17903, |E| = 197031)

gdistances(g, sources): 2.544 ms (6 allocations: 282.38 KiB)

static_parallel_distances(g, sources): 2.466 ms (29 allocations: 875.94 KiB)

dynamic_parallel_distances(g, sources): 1.206 ms (34 allocations: 1.11 MiB)



g = loadsnap(:facebook_combined) (|V| = 4039, |E| = 88234)

gdistances(g, sources): 412.740 μs (6 allocations: 64.05 KiB)

static_parallel_distances(g, sources): 343.053 μs (24 allocations: 198.97 KiB)

dynamic_parallel_distances(g, sources): 194.763 μs (30 allocations: 258.70 KiB)

2. Improved Sequential PageRank

As explained in the post titled “Weeks 5 to 6”, the pagerank code is of the form:

function old_pagerank(g::AbstractGraph, α, n, ϵ)
    x = fill(1/nv(g), nv(g))
    x_last = fill(1/nv(g), nv(g))
    for _ in 1:num_iterations
        DO SOMETHING

        for e in edges(g)
            u, v = src(e), dst(e)
            xlast[v] += α * x[u] / outdegree(g, u)
            if !is_directed(g)
                xlast[u] += α * x[v] / outdegree(g, v)
            end
        end

        DO SOMETHING
    end
end

I replaced the inner loop to iterate over the in-edges.

function pagerank(g::AbstractGraph, α, n, ϵ)
    x = fill(1/nv(g), nv(g))
    x_last = fill(1/nv(g), nv(g))
    for _ in 1:num_iterations
        DO SOMETHING

        for v in vertices(g)
            for u in inneighbors(g, v)
                xlast[v] += α * x[u] / outdegree(g, u)
            end
        end

        DO SOMETHING
    end
end

Although the same amount of “work” (number of operations) is being done, this implementation conducts memory operations more cache-efficiently.

Benchmarks


nthreads() = 4



g = loadsnap(:ego_twitter_u) (|V| = 81306, |E| = 1342310)

old_pagerank(g): 290.723 ms (10 allocations: 1.86 MiB)

pagerank(g): 95.063 ms (10 allocations: 1.86 MiB)

parallel_pagerank(g): 53.639 ms (21 allocations: 2.48 MiB)



g = loadsnap(:ca_astroph) (|V| = 17903, |E| = 197031)

old_pagerank(g): 53.715 ms (10 allocations: 420.00 KiB)

pagerank(g): 15.944 ms (10 allocations: 420.00 KiB)

parallel_pagerank(g): 10.485 ms (23 allocations: 560.44 KiB)



g = loadsnap(:facebook_combined) (|V| = 4039, |E| = 88234)

old_pagerank(g): 27.367 ms (10 allocations: 95.06 KiB)

pagerank(g): 8.610 ms (10 allocations: 95.06 KiB)

parallel_pagerank(g): 5.273 ms (26 allocations: 127.33 KiB)

3. Parallel Random Heuristics

As explained in the post titled “Weeks 1 to 2”, some algorithms will return a random solution to a problem.

Eg. the problem of Maximum Independent Set is to return the largest set of vertices in a graph such that no two vertices are adjacent.

The code maximal_independet_set(g) will return a random independent set S such that all vertices not in S are adjacent to a vertex in S.

We will usually makes Reps calls to random_maximal_independet_set(g) and return the largest set among them. The repeated calls can be made in parallel since the different calls are isolated from each other.

I wrote 2 function generate_min_set(g, gen_func, Reps) and generate_min_set(g, gen_func, Reps). generate_min_set(g, gen_func, Reps) makes Reps calls to gen_func(g) and returns the smallest set. You can guess what generate_max_set(g, gen_func, Reps) does.

The distributed implementation of generate_min_set be desciebed as:

function distr_generate_min_set(
	g::AbstractGraph{T}, 
	gen_func::Function, 
	Reps::Integer
	) where T<: Integer 
    # Type assert required for type stability
    min_set::Vector{T} = @distributed ((x, y)->length(x) \< length(y) ? x : y) for _ in 1:Reps
        gen_func(g)
    end
    return min_set
end

The multi-threaded version of generate_min_set can be described as:

function threaded_generate_min_set(
    g::AbstractGraph{T}, 
    gen_func::Function, 
    Reps::Integer
    ) where T<: Integer 
    n_t = Base.Threads.nthreads()
    is_undef = ones(Bool, n_t)
    min_sets = [Vector{T}() for _ in 1:n_t]

    Base.Threads.@threads for _ in 1:Reps
        t = Base.Threads.threadid()
        next_set = gen_func(g)
        if is_undef[t] || length(next_set) < length(min_sets[t])
            min_sets[t] = next_set
            is_undef[t] = false
        end
    end

    min_ind = 0
    for i in filter((j)->!is_undef[j], 1:n_t)
        if min_ind == 0 || length(min_sets[i]) < length(min_sets[min_ind])
            min_ind = i
        end
    end

    return min_sets[min_ind]
end

Benchmarks


gen_func = minimal_vertex_cover

nthreads() = 4

nworkers() = 4

Reps_t = 40

Reps_d = 400



g = loadsnap(:ego_twitter_u) (|V| = 81306, |E| = 1342310)

gen_func(g): 68.255 ms (43 allocations: 42.98 MiB)

threaded_generate_min_set(g, gen_func, Reps_t): 1.529 s (1753 allocations: 1.66 GiB)

distr_generate_min_set(g, gen_func, Reps_d): 14.454 s (419345 allocations: 101.94 MiB)



g = loadsnap(:ca_astroph) (|V| = 17903, |E| = 197031)

gen_func(g): 7.442 ms (39 allocations: 6.52 MiB)

threaded_generate_min_set(g, gen_func, Reps_t): 175.882 ms (1590 allocations: 242.64 MiB)

distr_generate_min_set(g, gen_func, Reps_d): 2.034 s (86002 allocations: 16.69 MiB)



g = loadsnap(:facebook_combined) (|V| = 4039, |E| = 88234)

gen_func(g): 2.870 ms (35 allocations: 2.82 MiB)

threaded_generate_min_set(g, gen_func, Reps_t): 69.671 ms (1362 allocations: 79.12 MiB)

distr_generate_min_set(g, gen_func, Reps_d): 721.288 ms (21644 allocations: 7.29 MiB)

4. GSoC Project Poster

Julia’s GSoC students are invited to attend Juliacon 2018 and present our project. We have to create a poster that briefly describes our project and accomplishments. My poster can be found here.