Tasks completed in weeks 7 to 8

  1. Explored and implemented degree based graph partitioning.
  2. Implemented Batch Priority Queue.
  3. Improved Prim’s Minimum Spanning Tree implementation.
  4. Improved Dijkstra’s Single Source Shortest Path implementation.

Details

1. Degree based Graph Partitioning

As explained in the post titled “Weeks 5 to 6”, I found that Parallel Pagerank had an improvement in performance by partitioning the vertices and assigning it to a processor in an attempt to balance the number of edges assigned to each processor. To avoid race condition, no two processors could be assigned edges that have the same destination vertex.

By balance, we mean to minimise that largest partition.

I restricted the partitioning to contiguous partitions because the partitions can be expressed as integer ranges instead of arrays that contain a total of |V| integers.

I had used a simple greedy heuristic to partition the edges.

The code is of the form:

function greedy_partition(degree_list::Vector{Integer}, num_part::Int)
    num = length(degree_list)
    suffix_sum = [sum(degree_list[i:num]) for i in 1:(num+1)]
    
    left = 1
    for part_remain in reverse(1:(num_part-1))
        right = left
        
        part_size = 0
        current_balance = max(part_size, suffix_sum[right+1]/part_remain)
        
        next_part_size = degree_list[right+1]
        next_balance = max(next_part_size, suffix_sum[right+2]/part_remain)

        while current_balance >= next_balance && right+1 < num
            partition_size = next_part_size
            current_balance = next_balance

            right += 1
            next_part_size += degree_list[right+1]
            next_balance = max(next_part_size, suffix_sum[right+2]/part_remain)
        end
        push!(partitions, left:right)
        left = right + 1
    end
    push!(partitions, left:num)

    return partitions
end

The code simply finds right such that:

max(sum(degree_list[left:right]), sum(degree_list[right+1:n])/partitions_remain)

is minimised.

I then wrote a simple code to find the optimal partition using binary search. The basic idea is you can check if it is possible to partition the vertices such that no partition has weight more than M in a single iteration of the vertices.

function binary_search_it(degree_list::Vector{Int}, num_part, M)
    possible = true

    sum_part = 0
    remain_part = num_part
    for w in degree_list
        sum_part += w
        if sum_part > M #Transfer to next partition.
            sum_part = w
            remain_part -= 1
            if remain_part == 0 # No more partition exists to transfer to.
                possible = false
                break
            end
        end
    end
    return possible
end

up_bound can be set to sum(degree_list).

low_bound can be set to fld(sum(degree_list), num_part).

Each iteration requires O(|V|) time.

The binary search will require log(sum(degree_list)) = log(|E|)+1 iterations.

This requries O(|V|*log(|E|)) time which is not much costlier than the O(|V|) time greedy approach.

However, when running both the partitioning functions on graphs from SNAPSDatasets.jl, I found that the partitions were very simillar and the balance did not differ.

Example


g = loadsnap(:ego_twitter_u)


greedy_contiguous_partition(indegree(g), 4): [1:8055, 8056:22812, 22813:44295, 44296:81306]


optimal_contiguous_partition(indegree(g), 4): [1:8055, 8056:22812, 22813:44297, 44298:81306]


Using the optimal algorithm over the greedy algorithm for page rank on Ego Twitter Graph resulted in a minor increase in run-time (65ms to 73ms). This was expected considering the partitions have almost no difference but the latter is costlier. I decided to use optimal_contiguous_partition for parallel_pagerank because the run-time of parallel_pagerank is far larger than the run-time of partitioning.

2. Batch Priority Queue

I had an idea to run Dijkstra and Prim in parallel using a Batch Priority Queue. The Dijkstra algorithm can be losely expressed as.

function dijkstra(g::AbstractGraph, source, distmx::AbstractMatrix)
    dists = fill(typemax(eltype(g)), nv(g))
    dists[source] = 0
    pq = make_priorityqueue(enumerate(dists))

    while !is_empty(pq)
        u = dequeue!(pq)
        for v in outneighbors(g, u)
            relax_dist = dists[u]+distmx[u, v]
            if new_dists < dists[u]
                dists[v] = relax_dists
                decrease_key!(pq, v, new_dists)
            end
        end
    end
end

The idea I had was to perform decrease_key! of several vertices in parallel by assigning them to independent priority queues.

function parallel_dijkstra(g::AbstractGraph, source, distmx::AbstractMatrix)
    n_t = Base.Threads.nthreads()
    nvg = nv(g)

    dists = fill(typemax(eltype(g)), nv(g))
    dists[source] = 0
    bpq = [ make_priorityqueue( [ (j, dists[i:nvg:n_t]) for j in 1:nvg:n_t ])    for i in 1:n_t) ]

    queue = [ Vector{eltype(g)}() for i in 1:n_t ]

    while !is_empty(bpq) # Check if any of the n_t priority queues are not empty
        u = dequeue!(bpq) # Compare the minimum of each of the n_t queue and dequeue it.
        for v in outneighbors(g, u)
            relax_dist = dists[u]+distmx[u, v]
            if new_dists < dists[u]
                dists[v] = relax_dists

                q_id = mod(v, n_t)+1
                push!(queue[q_id], v)
            end
        end

        #Batch Decrease Key
        @threads for thread_id in 1:n_t
            pq = bpq[thread_id]
            for v in queue[thread_id]
                decrease_key!(pq, v, dists[v])
            end
            empty!(pq)
        end
    end
end

The basic idea behind this is partition the vertices into n_t sets using the modulo (remainder) function.

Thread 1 will perform decrease_key! on priority queue bpq[1],

and only on nodes 1, n_t+1, 2*n_t+1, etc.

Thread 2 will perform decrease_key! on priority queue bpq[2],

and only on nodes 2, n_t+2, 2*n_t+2, etc.

After several days of optimising and looking out for unexpected race-conditions (Eg. reallocation when using push! and empty!), I realised that the time taken to partition the data (q_id = mod(v, n_t)+1; push!(queue[q_id], v)) exceeds the time take for decrease_key! (On a graph with |V| = 10^4).

decrease_key! requires O(log(|V|)) time while the partitioning requires O(1) time. So for some large enough |V|, partitioning would be faster. But given this, it seemed impossible to obtain valuable speed up for practical problems using this method.

3. Improved implementation of Prim’s MST

Prim’s MST was implemented using a heap of edges. I replaced it with a priority queue of vertices. This reduced the size of the data structure from O(|E|) to O(|V|). The trick here is you have to keep track of which edge to insert into the Minimum Spanning Tree when you dequeue a vertex from the priority queue.

Benchmarks


distmx = sparse(g)

for e in edges(g)
  d[e.src, e.dst] = d[e.dst, e.src] = rand(-n:n)
end


g = loadsnap(:ego_twitter_u) ( V = 81306, E = 1342310)

old_prim(g, distmx): 1.743 s (5274021 allocations: 132.38 MiB)

prim(g, distmx): 228.200 ms (802705 allocations: 20.28 MiB)



g = loadsnap(:ca_astroph) ( V = 17903, E = 197031)

old_prim(g, distmx): 149.062 ms (768457 allocations: 19.39 MiB)

prim(g, distmx): 37.570 ms (197234 allocations: 4.59 MiB)



g = loadsnap(:facebook_combined) ( V = 4039, E = 88234)

old_prim(g, distmx): 41.353 ms (340348 allocations: 8.59 MiB)

prim(g, distmx): 10.230 ms (45947 allocations: 973.91 KiB)

4. Improved implementation of Dijkstra’s SSSP

I. Cache Efficiency

Matrices in Julia are usually collumn-major. i.e. it is MUCH more efficient to iterate over a collumn of a matrix. Due to the conventional format of the distance matrix (distmx[i, j] = weight of edge i -> j), the code iterates over the row. We decided to instruct the user in the performance docs to use a Transpose (which is row-major) type matrix to represent the distance matrix.

II. Optimise Collecting additional data.

Dijkstra has some optional variables to collect data other than dists (and parents). For example, dijkstra_shortest_paths(g, allpaths=true) will return a predecessor relation that can be used to find all the shortest paths from source to a particular vertex.

For all v in vertices(g), preds[v] = [u in inneighbors(g) if dists[u]+distmx[u, v] == dists[v]]

My first attempt was to separate the additional data collection from the usual algorithm. Eg.

I replaced

function old_dijkstra(g::AbstractGraph, source::Integer, distmx::AbstractMatrix)  # allpaths = true
    dists = fill(typemax(eltype(g)), nv(g))
    dists[source] = 0
    pq = make_priorityqueue(dists)
    preds = fill(Vector(), nv(g))

    while !is_empty(pq)
        u = dequeue!(pq)
        for v in outneighbors(g, u)
            relax_dist = dists[u]+distmx[u, v]
            if relax_dists < dists[u]
                dists[v] = relax_dists
                decrease_key!(pq, v, new_dists)
                preds[v] = []
            end
            elseif relax_dists == dists[u]
                push!(preds[v], u)
            end
        end
    end
end

with

function attempt_dijkstra(g::AbstractGraph, source::Integer, distmx::AbstractMatrix) # allpaths = true
    dists = fill(typemax(eltype(g)), nv(g))
    dists[source] = 0
    pq = make_priorityqueue(dists)
    num_preds = zeros(nv(g))

    while !is_empty(pq)
        u = dequeue!(pq)
        for v in outneighbors(g, u)
            relax_dist = dists[u]+distmx[u, v]
            if relax_dists < dists[u]
                dists[v] = relax_dists
                decrease_key!(pq, v, new_dists)
                num_preds[v] = 1
            elseif relax_dists == dists[u]
                num_preds[v] += 1
            end
        end
    end

    preds = [sizehint!(Vector(), num_preds[i]) for i in vertices(g)]
    for e in edges(g)
        if dists[e.dst] == dists[e.src] + distmx[e.src, e.dst]
            push!(preds[e.dst], e.src)
        end
    end
end

I believed this would cause a speed up because it does not unnecessarily push and empty preds[v]. Also sizehint! is used to minimise reallocations.

However, it caused the code to slow down. The cost of iterating over edges exceeds the benefit. In hindsight, this could be expected because it is not common for many different paths to a particular vertex to have the exact same distance.

I was able to obtain a speed up by re-using arrays. I simply replaced preds[v] = [] with resize!(preds[v], 1).

Benchmarks


src = 1

distmx = sparse(g)

for e in edges(g)
  d[e.src, e.dst] = d[e.dst, e.src] = rand(-n:n)
end

tr_distmx = transpose(distmx)



g = loadsnap(:ego_twitter_u) (|V| = 81306, |E| = 1342310)

old_dijkstra_shortest_paths(g, src, distmx): 559.962 ms (292429 allocations: 33.16 MiB)

dijkstra_shortest_paths(g, src, distmx): 492.507 ms (67 allocations: 10.85 MiB)

dijkstra_shortest_paths(g, src, tr_distmx): 235.182 ms (67 allocations: 10.85 MiB)



g = loadsnap(:ca_astroph) (|V| = 17903, |E| = 197031)

old_dijkstra_shortest_paths(g, src, distmx): 64.641 ms (52125 allocations: 6.06 MiB)

dijkstra_shortest_paths(g, src, distmx): 53.635 ms (59 allocations: 2.09 MiB)

dijkstra_shortest_paths(g, src, tr_distmx): 30.818 ms (59 allocations: 2.09 MiB)



g = loadsnap(:facebook_combined) (|V| = 4039, |E| = 88234)

old_dijkstra_shortest_paths(g, src, distmx): 16.252 ms (16509 allocations: 1.66 MiB)

dijkstra_shortest_paths(g, src, distmx): 13.380 ms (47 allocations: 417.92 KiB)

dijkstra_shortest_paths(g, src, tr_distmx): 9.720 ms (47 allocations: 417.92 KiB)



allpaths = trackvertices = true



g = loadsnap(:ego_twitter_u) (|V| = 81306, |E| = 1342310)

old_dijkstra_shortest_paths(g, src, distmx): 598.619 ms (521217 allocations: 47.38 MiB)

dijkstra_shortest_paths(g, src, distmx): 534.856 ms (81391 allocations: 18.29 MiB)

dijkstra_shortest_paths(g, src, tr_distmx): 269.884 ms (81391 allocations: 18.29 MiB)



g = loadsnap(:ca_astroph) (|V| = 17903, |E| = 197031)

old_dijkstra_shortest_paths(g, src, distmx): 67.667 ms (96120 allocations: 8.90 MiB)

dijkstra_shortest_paths(g, src, distmx): 59.666 ms (17971 allocations: 3.73 MiB)

dijkstra_shortest_paths(g, src, tr_distmx): 33.853 ms (17971 allocations: 3.73 MiB)



g = loadsnap(:facebook_combined) (|V| = 4039, |E| = 88234)

old_dijkstra_shortest_paths(g, src, distmx): 16.958 ms (28984 allocations: 2.46 MiB)

dijkstra_shortest_paths(g, src, distmx): 13.678 ms (4102 allocations: 831.09 KiB)

dijkstra_shortest_paths(g, src, tr_distmx): 9.922 ms (4102 allocations: 831.09 KiB)