Tasks completed in weeks 5 to 6
- Parallel Bellman Ford Shortest Paths algorithm without memory overhead.
- Implemented Parallel Pagerank.
- Implemented Parallel Kruskal.
- Implemented Multi-threaded Centrality Measures.
Details
1. Parallel Bellman Ford without memory overhead
Refering to the post titled “Weeks 3 to 4”, I wrote a parallel implementation of bellman-ford that provides a dists
and parents
array to each thread.
I was able to make athe algorithm thread-safe without using atomic operations or local memory.
Replace:
With:
Thus, I made the multi-threaded loop thread-safe by operating on the in-edges instead of the out-edges. The trade-off is the code will likely end up iterating over more edges than necessary. i.e. Instead of iterating over outneighbors(g, active)
, it will iterate over inneighbors(g, outneighbors(g, active))
.
I also attempted to parallelise it using atomic operations. However, the run-time shot through the roof with that implementation
Benchmarks
nthreads() = 4
src = 1
distmx = sparse(g)
for e in edges(g)
d[e.src, e.dst] = d[e.dst, e.src] = rand(0:n)
end
g = loadsnaps(:facebook_combined)
( |V| = 4039, |E| = 88234)
seq_bellman_ford_shortest_paths(g, src, distmx)
: 62.604 s (92898 allocations: 764.61 MiB)
parallel_bellman_ford_shortest_paths(g, src, distmx)
: 33.319 s (28290 allocations: 251.70 MiB)
2. Parallel Pagerank
The sequential code can be broadly described as:
My first attempt to parallelize the code was:
The implementation is perfectly thread-safe.
One issue I noticed is @threads
evenly divides the iteration space (vertices(g)
) so one thread would end up iterating over much more edges than the others. Note that @threads for e in edges(g)
would not be thread safe.
Eg. Consider the case where nthreads() = 2
and the input graph is a star graph with 8 nodes.
Thread 1 and 2 will be assigned nodes {1, 2, 3, 4} and {5, 6, 7, 8} respectively.
Assuming node 1 is the internal node, thread 1 and 2 will iterate over edges {(1, 2), (1, 3), (1, 4) (2, 1), (3, 1), (4, 1)} and {(5, 1), (6, 1), (7, 1) (8, 1)}.
The optimal method to divide the work load would be to assign nodes {1} and {2, 3, 4, 5, 6, 7, 8} to threads 1 and 2 respectively. Then both threads will iterate over the same number of edges.
Since the same iteration space is used multiple (num_iterations
) times, it might be useful to find a partition of vertices that divides the edges more evenly.
The problem of finding an optimal “load balanced” partition is NP-Hard (No polynomial time algorithm exists). I wrote a simple greedy heuristic to obtain a partition, weighted_partition(weights, required_partitions)
.
Benchmarks
nthreads() = 4
g = loadsnap(:ego_twitter_u)
(|V| = 81306, |E| = 1342310)
pagerank(g)
: 192.269 ms (10 allocations: 1.86 MiB)
parallel_pagerank(g)
: 70.239 ms (23 allocations: 3.72 MiB)
g = random_regular_graph(10000, 2000)
(|V| = 10,000, |E| = 10,000,000)
pagerank(g)
: 1.404 s (10 allocations: 234.75 KiB)
parallel_pagerank(g)
: 237.902 ms (20 allocations: 469.50 KiB)
3. Parallel Kruskal
I attempted to parallelize the find_cross_edge
portion of the algorithm.
i.e. given a set of connected components and an array of edges, find the edge with minimum index (In the edge list) whose end-points are in two different graphs.
These implementations are meant for algorithms of the following format:
- Intialise each vertex to be in a separate connected component.
- Search for the cross-edge
e
of minimum index usingfind_cross_edge!
. - If no such
e
is found then return. - Merge the connected components containing the end-points.
- Goto 2.
The set of connected components of the graph are maintained using a Disjoint Set DataStructure.
The sequential algorithm can be described as follows
I wrote the parallel version roughly as:
Basically, thread i
would only check the edges with indices {i
, i+nthreads()
, i+2*nthreads()
….}.
The caveat here is connected_vs
is a Disjoint Set DataStructure. Hence, not_same_set!(connected_vs, e.src, e.dst)
requires read-write operations over common variables (so does best_ind = min(best_ind, i)
). Hence we require atomic operations or local memory to make it thread safe.
In the local memory implementation, every thread is provided its own Disjoint Set DataStructure, local_connected_vs
to make queries on. The “merge connected component” operation would have to be performed on of the local_connected_vs
.
Results
I benchmarked the parallel implementations against the sequential implementations on Anubis using 10 threads. I used an input graph with 10^8 edges.
Both implementations of parallel_find_cross_edge
were at best 10x slower than sequential implementations.
-
The issue with the implementation using atomic operations is atomic operations are much costlier than other operations (atleast 100x). In fact, I noticed that the run-time had an almost linear relationship with the number of atomic operations used.
-
The issue with the implementation using local memory is queries to a Disjoint Set DataStructure become faster with every subsequent use. By providing each thread with its own
local_connected_vs
, fewer queries were made on each data structure. Also, “merge connected component” operation would have to be performed on each of thelocal_connected_vs
.
4. Multi-threaded Centrality Measures
I wrote thread-safe implementations of multi-threaded
- Betweeness
- Closeness
- Stress
These algorithms are of the form:
They are easy parallelize as the Dijkstra can be run independently. But the addition of info
must be done in a thread-safe manner.
Parallel implementations using processes already exist. They are usually more efficient than threads, but they require more memory (Each process copies the graph). Hence, the multi-threaded implementations would be more useful if there is insufficient memory.
Although this created a significant speed up over the sequential version, the pre-existing distributed implementations of centrality measures performed just as well. The benefit of using multithreaded implementations over distributed implementations is you do not need to copy the data into multiple processes.
Benchmarks
g = loadsnap(:facebook_combined)
(|V| = 4039, |E| = 88234)
distmx = sparse(g)
for e in edges(g)
d[e.src, e.dst] = d[e.dst, e.src] = rand(0:n)
end
vs = collect(vertices(g))
nthreads() = 4
nworkers() = 4
betweenness_centrality(g, vs, distmx)
: 59.572 s (116871141 allocations: 8.77 GiB)
threaded_betweenness_centrality(g, vs, distmx)
: 30.474 s (107868756 allocations: 9.08 GiB)
distr_betweenness_centrality(g, vs, distmx)
: 29.564 s (21577 allocations: 4.80 MiB)
closeness_centrality(g, distmx)
: 56.938 s (66577274 allocations: 6.10 GiB)
threaded_closeness_centrality(g, distmx)
: 26.270 s (63217567 allocations: 5.84 GiB)
distr_closeness_centrality(g, distmx)
: 26.689 s (22472 allocations: 4.54 MiB)
stress_centrality(g, distmx)
: 11.132 s (24771184 allocations: 3.54 GiB)
threaded_stress_centrality(g, distmx)
: 6.724 s (23077828 allocations: 3.32 GiB)
distr_stress_centrality(g, distmx)
: 6.696 s (21470 allocations: 4.74 MiB)