Tasks completed in weeks 5 to 6

  1. Parallel Bellman Ford Shortest Paths algorithm without memory overhead.
  2. Implemented Parallel Pagerank.
  3. Implemented Parallel Kruskal.
  4. Implemented Multi-threaded Centrality Measures.

Details

1. Parallel Bellman Ford without memory overhead

Refering to the post titled “Weeks 3 to 4”, I wrote a parallel implementation of bellman-ford that provides a dists and parents array to each thread.

I was able to make athe algorithm thread-safe without using atomic operations or local memory.

Replace:

 
function seq_bellman_ford(g::AbstractGraph, src, distmx::AbstractMatrix)
    dists = fill(typemax(eltype(g)), nv(g))
    dists[src] = 0
    active = Set([src])

    for _ in 1:nv(g)
        new_active = Set([])
        for u in active
            for v in outneighbors(graph, u)
                relax_dist = distmx[u, v] + dists[u]
                if dists[v] > relax_dist 
                    dists[v] = relax_dist
                    union!(new_active, v)
                end
            end
        end
        active = new_active
        is_empty(active) && break
    end
end

With:

 
function parallel_bellman_ford(g::AbstractGraph, src, distmx::AbstractMatrix)
    dists = fill(typemax(eltype(g)), nv(g))
    dists[src] = 0
    active = Set([src])

    for _ in 1:nv(g)
        active = Set(outneighbors(g, active))
        prev_dist = deepcopy(dists)

        @threads for v in active
            for u in inneighbors(g, v)
                relax_dist = prev_dists[u] + distmx[u,v])
                if prev_dists[v] > relax_dist
                    prev_dists[v] = relax_dist
                end
            end
        end

        empty!(active)
        for v in vertices(g) #Obtain new_active separately
            if dists[v] < prev_dists[v]
                union!(active, v)
            end
        end
        is_empty(active) && break
    end
end

Thus, I made the multi-threaded loop thread-safe by operating on the in-edges instead of the out-edges. The trade-off is the code will likely end up iterating over more edges than necessary. i.e. Instead of iterating over outneighbors(g, active), it will iterate over inneighbors(g, outneighbors(g, active)).

I also attempted to parallelise it using atomic operations. However, the run-time shot through the roof with that implementation

Benchmarks


nthreads() = 4

src = 1

distmx = sparse(g)

for e in edges(g)
  d[e.src, e.dst] = d[e.dst, e.src] = rand(0:n)
end


g = loadsnaps(:facebook_combined) ( |V| = 4039, |E| = 88234)

seq_bellman_ford_shortest_paths(g, src, distmx): 62.604 s (92898 allocations: 764.61 MiB)

parallel_bellman_ford_shortest_paths(g, src, distmx): 33.319 s (28290 allocations: 251.70 MiB)


2. Parallel Pagerank

The sequential code can be broadly described as:

function pagerank(g::AbstractGraph, α, n, ϵ)
    x = fill(1/nv(g), nv(g))
    x_last = fill(1/nv(g), nv(g))
    for _ in 1:num_iterations
        DO SOMETHING

        for e in edges(g)
            u, v = src(e), dst(e)
            xlast[v] += α * x[u] / outdegree(g, u)
            if !is_directed(g)
                xlast[u] += α * x[v] / outdegree(g, v)
            end
        end

        DO SOMETHING
    end
end

My first attempt to parallelize the code was:

function parallel_pagerank_1(g::AbstractGraph, α, n, ϵ)
    x = fill(1/nv(g), nv(g))
    x_last = fill(1/nv(g), nv(g))
    for _ in 1:num_iterations
        DO SOMETHING

        @threads for v in vertices(g)
            for u in inneighbors(g, v)
                xlast[v] += α * x[u] / outdegree(g, u)
            end
        end

        DO SOMETHING
    end
end

The implementation is perfectly thread-safe. One issue I noticed is @threads evenly divides the iteration space (vertices(g)) so one thread would end up iterating over much more edges than the others. Note that @threads for e in edges(g) would not be thread safe.

Eg. Consider the case where nthreads() = 2 and the input graph is a star graph with 8 nodes.

Thread 1 and 2 will be assigned nodes {1, 2, 3, 4} and {5, 6, 7, 8} respectively.

Assuming node 1 is the internal node, thread 1 and 2 will iterate over edges {(1, 2), (1, 3), (1, 4) (2, 1), (3, 1), (4, 1)} and {(5, 1), (6, 1), (7, 1) (8, 1)}.

The optimal method to divide the work load would be to assign nodes {1} and {2, 3, 4, 5, 6, 7, 8} to threads 1 and 2 respectively. Then both threads will iterate over the same number of edges.

Since the same iteration space is used multiple (num_iterations) times, it might be useful to find a partition of vertices that divides the edges more evenly.

The problem of finding an optimal “load balanced” partition is NP-Hard (No polynomial time algorithm exists). I wrote a simple greedy heuristic to obtain a partition, weighted_partition(weights, required_partitions).

function parallel_pagerank(g::AbstractGraph, α, n, ϵ)
    partition = weighted_partition(indegree(g), nthreads())

    x = fill(1/nv(g), nv(g))
    x_last = fill(1/nv(g), nv(g))
    for _ in 1:num_iterations
        DO SOMETHING

        @threads for v_set in partition
            for v in v_set
                for u in inneighbors(g, v)
                    xlast[v] += α * x[u] / outdegree(g, u)
                end
            end
        end

        DO SOMETHING
    end
end

Benchmarks

nthreads() = 4



g = loadsnap(:ego_twitter_u) (|V| = 81306, |E| = 1342310)


pagerank(g): 192.269 ms (10 allocations: 1.86 MiB)


parallel_pagerank(g): 70.239 ms (23 allocations: 3.72 MiB)



g = random_regular_graph(10000, 2000) (|V| = 10,000, |E| = 10,000,000)


pagerank(g): 1.404 s (10 allocations: 234.75 KiB)


parallel_pagerank(g): 237.902 ms (20 allocations: 469.50 KiB)


3. Parallel Kruskal

I attempted to parallelize the find_cross_edge portion of the algorithm. i.e. given a set of connected components and an array of edges, find the edge with minimum index (In the edge list) whose end-points are in two different graphs.

These implementations are meant for algorithms of the following format:

  1. Intialise each vertex to be in a separate connected component.
  2. Search for the cross-edge e of minimum index using find_cross_edge!.
  3. If no such e is found then return.
  4. Merge the connected components containing the end-points.
  5. Goto 2.

The set of connected components of the graph are maintained using a Disjoint Set DataStructure.

The sequential algorithm can be described as follows

 
function find_cross_edge!(connected_vs, edge_list, start_ind = 1)
    for ind in start_index:length(edge_list)
        e = edge_list[ind]
        if not_same_set!(connected_vs, e.src, e.dst)
            return ind, e
        end
    end
end

I wrote the parallel version roughly as:

 
function parallel_find_cross_edge!(connected_vs, edge_list, local_start_ind = collect(1:nthreads()))
    best_ind = length(edge_list)+1
    @threads for thread_id in 1:nthreads()
        i = local_start_ind[thread_id]

        while i < best_ind 
            e = edge_list[i]
            if not_same_set!(connected_vs, e.src, e.dst)
                local_start_ind[thread_id] = i #Update local_start_ind for subsequent calls.
                best_ind = min(best_ind, i)
                break
            end
            i += nthreads()
        end
    end
    return best_ind, edge_list[best_ind]
end

Basically, thread i would only check the edges with indices {i, i+nthreads(), i+2*nthreads() ….}. The caveat here is connected_vs is a Disjoint Set DataStructure. Hence, not_same_set!(connected_vs, e.src, e.dst) requires read-write operations over common variables (so does best_ind = min(best_ind, i)). Hence we require atomic operations or local memory to make it thread safe.

In the local memory implementation, every thread is provided its own Disjoint Set DataStructure, local_connected_vs to make queries on. The “merge connected component” operation would have to be performed on of the local_connected_vs.

Results

I benchmarked the parallel implementations against the sequential implementations on Anubis using 10 threads. I used an input graph with 10^8 edges. Both implementations of parallel_find_cross_edge were at best 10x slower than sequential implementations.

  1. The issue with the implementation using atomic operations is atomic operations are much costlier than other operations (atleast 100x). In fact, I noticed that the run-time had an almost linear relationship with the number of atomic operations used.

  2. The issue with the implementation using local memory is queries to a Disjoint Set DataStructure become faster with every subsequent use. By providing each thread with its own local_connected_vs, fewer queries were made on each data structure. Also, “merge connected component” operation would have to be performed on each of the local_connected_vs.

4. Multi-threaded Centrality Measures

I wrote thread-safe implementations of multi-threaded

  • Betweeness
  • Closeness
  • Stress

These algorithms are of the form:

function centrality(g::AbstractGraph, vertex_list, distmx::AbstractMatrix)
    info = zeros(vertices(g)) 
    for v in vertex_list
        dijk_info = dijkstra(g, v)
        info += get_info(dijk_info)
    end
    return info
end

They are easy parallelize as the Dijkstra can be run independently. But the addition of info must be done in a thread-safe manner.

Parallel implementations using processes already exist. They are usually more efficient than threads, but they require more memory (Each process copies the graph). Hence, the multi-threaded implementations would be more useful if there is insufficient memory.

function threaded_centrality(g::AbstractGraph, vertex_list, distmx::AbstractMatrix)
    local_info = [zeros(vertices(g)) for _ in 1:nthreads()] 
    @threads for v in vertex_list
        dijk_info = dijkstra(g, v)
        local_info[threadid()] += get_info(dijk_info)
    end
    return reduce(+, local_info)
end

Although this created a significant speed up over the sequential version, the pre-existing distributed implementations of centrality measures performed just as well. The benefit of using multithreaded implementations over distributed implementations is you do not need to copy the data into multiple processes.

Benchmarks


g = loadsnap(:facebook_combined) (|V| = 4039, |E| = 88234)

distmx = sparse(g)

for e in edges(g)
  d[e.src, e.dst] = d[e.dst, e.src] = rand(0:n)
end

vs = collect(vertices(g))

nthreads() = 4

nworkers() = 4



betweenness_centrality(g, vs, distmx): 59.572 s (116871141 allocations: 8.77 GiB)

threaded_betweenness_centrality(g, vs, distmx): 30.474 s (107868756 allocations: 9.08 GiB)

distr_betweenness_centrality(g, vs, distmx): 29.564 s (21577 allocations: 4.80 MiB)



closeness_centrality(g, distmx): 56.938 s (66577274 allocations: 6.10 GiB)

threaded_closeness_centrality(g, distmx): 26.270 s (63217567 allocations: 5.84 GiB)

distr_closeness_centrality(g, distmx): 26.689 s (22472 allocations: 4.54 MiB)



stress_centrality(g, distmx): 11.132 s (24771184 allocations: 3.54 GiB)

threaded_stress_centrality(g, distmx): 6.724 s (23077828 allocations: 3.32 GiB)

distr_stress_centrality(g, distmx): 6.696 s (21470 allocations: 4.74 MiB)