Tasks completed in weeks 9 to 10

  1. Merged Pull Requests.
  2. Explored max-flow algorithms.
  3. Explored Optimistic Concurrency Control
  4. Implemented lock-free parallel Breadth-First Search with static load balancing.
  5. WIP: Implementing lock-free parallel Breadth-First Search with dynamic load balancing

Details

1. Merged pull requests

As we are nearing the end of GSoC, I have been spending more time on getting the code mentioned in previous posts merged into the main LightGraphs repository on Github. This includes:

  1. Parallel Pagerank with degree based partitioning.
  2. Parallel Bellman Ford.
  3. Improved Dijkstra.
  4. Greedy Heuristics.

2. Explored parallel Max-Flow algorithms and BFS

At the time of writing my proposal, I had intended to use atomic operations to produce parallel implementations of algorithms computing Max-Flow in a flow network such as Edmund-Karp, Dinic and Push-Relabel. From my experiences described in previous posts, I learned that it would be impossible to obtain an improvement in performance by relying on atomic operations. In fact, the parallel Breadth-First Search implementation in LightGraphs is an order of magnitude slower than the sequential implementation on a machine with 20 cores.

After discussing this with my mentor, we decided to focus on parallelizing implementations of graph analysis algorithms without atomic operations or locks. I then came across the technique of optimistic concurrency control (described below) and attempted to implement parallel Breadth-First Search using it.

3. Optimistic concurrency control

As we have mentioned in previous posts, if multiple processes have read/write or write/write access to the data at a common memory location, it can lead to unexpected an result. This is avouded locks or atomic instructions to maintain the correctness of the code. As mentioned in the above section 2, atomic operations and locks degrade the performance of the parallel code and we might end up with code that is slower than the sequential code.

An approach to overcome this obstacle is to allow data races to occur and then recover from them. Hence, the correctness of the program is not affected by the data races.

Examples of Race Conditions

  1. Interleaving of Machine Level Instruction.
  2. Interleaving of Memory Read/Write.
  3. Compile Time Code Optimisation.

Interleaving of Machine Level Instruction

These are race conditions that can occur due to the fact that a single high level instruction consists of multiple machine level instructions. For example:

Julia Instruction:

Thread 1: a += 1

Thread 2: a += 1

Machine Level Instructions:

Thread 1:

Read a into R_1

Increment R_1

Write R_1 into a

Thread 2:

Read a into R_2

Increment R_2

Write R_2 into a

The programmer might expect a to be incremented by 2 but find it to be incremented by 1. Since the a += 1 instruction is broken up into multiple instructions, it is possible the first read instruction sets R_1 and R_2 to the initial value of a (Lost update).


Interleaving of Memory Read/Write

If the data size is greater than the word size then a single high level read/write operation will be broken up into multiple machine level read/write operations.

For example, If variable a is of size 16-bit and the word size is 8-bit then a single Julia write on a will consist of two machine level writes as a single machine level write can modify 8 bits at a time.

Consider a machine with 8 bit word size.

Julia Instructions:

Thread 1: R_1 = 0xFFFF

Thread 2: R_1 = 0x0000

Machine Level Instructions:

Thread 1:

R_1[1] = 0xFF

R_1[2] = 0xFF

Thread 2:

R_1[1] = 0x00

R_1[2] = 0x00

The programmer might expect R_1 to have the value 0xFFFF or 0x0000, but the interleaving of the instructions could leave it to have the value 0xFF00 or 0x00FF.


Compile Time Code Optimisation

During the code optimisation phase of compilation, the compiler may modify the code to improve performance without considering the effect of other threads modifying the data. For example,

a = 1
dists = zeros(Int64, n)
for i in 1:length(dist)
	dists[i] += A
end

Thread 1: a = 2

The compiler may modify this during compile time code optimisation as:

a = 1
dists = zeros(Int64, n)
for i in 1:length(dist)
	dists[i] += 1
end

Thread 1: a = 2

Hence, none of the changes made to a by thread 1 will be noted by the other threads.

Some languages allow data to be marked as volatile to inform the compiler that the data may be modified externally and the data must be accessed. Julia does not have this functionality.

Note: It should be kept in mind that the compiler may reorder read/write operations to improve efficiency.

Axioms

For the remainder of this blog post, I will be assuming that the following statements are true:

I) If multiple threads have read/write access to a variable and every write is preceded by a read then the first read will obtain the correct value.

For example:

a = true
n_t = nthreads()
a_read_true = falses(n_t)
@threads for thread in 1:n_t
    if a
        a = false
        a_read_true[thread] = true
    end
end

At the end of this code, the programmer can expect that at least one entries in a_read_true will be set to true.

Justification: This is a reasonable assumption because the programmer always expects a read operation to succeed when a write operation is not occuring concurrently. Since every write operation is preceded by a read, the first read operation must have occurred without any concurrent write operation.

II) If multiple threads write the same value into a variable then at the end of the multithreaded loop, the variable will have the common value.

For example:

n = 100
A = zeros(Int64, n)
indices = rand(1:n, 1000)
@threads for i in indices
    A[i] = 1
end

At the end of this code the programmer can expect that:

a) If i is in indices then at the end of the loop, A[i] = 1.

b) If i is in indices then at the end of the loop, A[i] = 0.

Justification: The machine level instructions of write are idempotent. Hence, interleaving of the machine instructions has no effect.

It should be noted that this axiom is true only if the write operation A[i] = 1 only manipulates the data located at A[i]. If the size of the elements of A are less than the word size, this will not be the case.

III) If a write operation does not change the value of a variable then concurrent read/write will not cause a race condition.

For example:

Initially, a = 100 Thread 1: a = 100 Thread 2: b = a

The programmer can expect that b will read a as 100.

Justification: Even if machine level instructions are interleaved, the value of a will not change throughout the write operation.

4. Lock-free parallel BFS with static load balancing

Breadth-First Search is used to find the minimum hop distance (Equivalent to distance with unweighted edges) from some sources to all vertices in a graph.

function gdistances!(g::AbstractGraph{T}, sources::Vector{<:Integer}) where T<:Integer
   
    n = nv(g)
    vert_level = fill(n, typemax(T)) # Minimum Hop Distances

    visited = falses(n)
    cur_level = Vector{T}()
    next_level = Vector{T}()

    sizehint!(cur_level, n)
    sizehint!(next_level, n)

    @inbounds for s in sources
        push!(cur_level, s)
        vert_level[s] = zero(T)
        visited[s] = true
    end

    n_level = one(T) # The level being explored 
    while !isempty(cur_level)
        @inbounds for v in cur_level
            @inbounds @simd for i in outneighbors(g, v)
                if !visited[i] # i has not been explored yet
                    push!(next_level, i)
                    vert_level[i] = n_level
                    visited[i] = true
                end
            end
        end
        n_level += one(T) #Increment the level
        empty!(cur_level)
        cur_level, next_level = next_level, cur_level
        sort!(cur_level) # For Cache Efficiency 
    end
    return vert_level
end

As you can see, the program explored the graph starting from vertices in sources. When a vertex i was explored for the first time, its level (vert_level[i]) was set to the current iteration number (n_level).

In the parallel implementation, each thread t its own next_level list (next_level_t[t]). In each iteration of the graph exploration, cur_level is partitioned to provide each thread its own set of vertices to explore ().

Each thread t iterates over a vertex v in its own partition and explores i in outneighbors(g, v). If i has not been explored yet (!visited[i]) then the thread pushes i into next_level_t[t].

function unweighted_fast_partition!(num_items::Integer, part::Vector{UnitRange{T}}) where T<:Integer
    prefix_sum = 0
    num = length(part)
    for t in 1:num
        left = prefix_sum
        prefix_sum += div(num_items-1+t, num)
        part[t] =  (left+1):prefix_sum
    end

end

function parallel_gdistances!(
    g::AbstractGraph{T}, 
    source::Vector{<:Integer}
    ) where T <:Integer
 
    n = nv(g)

    visited = zeros(Bool, n)
    old_visited = zeros(Bool, n)
    vert_level = fill(typemax(T), n)

    next_level_t = [Vector{T}() for _ in 1:nthreads()]
    vert_level = fill(typemax(T), n)
    cur_level = deepcopy(source)
    sizehint!(cur_level, n)

    for s in source
        visited[s] = old_visited[s] = true
        vert_level[s] = zero(T)
    end

    n_level = one(T)
    partitions = Vector{UnitRange{T}}(undef, nthreads())

    while !isempty(cur_level)
        unweighted_partition!(partitions, length(cur_level))
        
        @threads for v_set in partitions
            t = threadid()
            next_level = next_level_t[t] #Each thread gets its own next_level array
            
            for v in v_set
                for i in outneighbors(g, v)
                    if !visited[i]
                        visited[i] = true # Race condition
                        push!(next_level, i)
                    end
                end
            end

        end

        empty!(cur_level)
        @inbounds for t in 1:nthreads()

            for v in next_level_t[t]
                if !old_visited[v] 
                    old_visited[v] = true
                    push!(cur_level, v)
                    vert_level[v] = n_level
                end
            end
	    empty!(next_level_t[t])
        end


        n_level += one(T)
       sort!(cur_level, alg = QuickSort) #Cache Efficiency
    end

    return vert_level
end

The only data that multiple threads have read/write access to is the visited array. The old_visited array is used to ensure that no vertex is explored multiple times.

Data Races

A data race can occur on the visited array if 2 threads are iterating exploring vertex u, w and a vertex i is present in both outneighbors(g, u) and outneighbors(g, w). Recall that each thread has its own next_level array.

for i in outneighbors(g, v)
    if !visited[i]
        visited[i] = true # Race condition
        push!(next_level, i)
    end
end

It can be inferred from axiom I & II that these data races do not compromise the correctness of the program.

Note that this implementation uses a boolean array, whereas the sequential implementation uses a bit array. This is to avoid the complication explained in axiom II which occurs when write operations are not independent.

Benchmarks


nthreads() = 4


sources = [1,]



g = random_regular_graph(200000, 400) (|V| = 200000, |E| = 40000000)


gdistances(g, sources): 136.482 ms (9 allocations: 4.60 MiB)


parallel_distances(g, sources): 90.917 ms (28 allocations: 10.73 MiB)



g = loadsnap(:ego_twitter_u) (|V| = 81306, |E| = 1342310)


gdistances(g, sources): 13.741 ms (6 allocations: 1.25 MiB


parallel_distances(g, sources): 13.046 ms (26 allocations: 3.88 MiB)


As you can see, BFS on g = random_regular_graph(200000, 400) obtained a good speed up while BFS on g = loadsnap(:ego_twitter_u) showed almost no difference. This is likely because the degree distribution of the the latter graph is less uniform that the former and hence, leads to poor load balancing. Using better partitioning techniques such as optimal_weighted_partition to obtain better load balancing provides no benefit as the partitioning runtime exceeds the BFS runtime, making it impossible to improve performance. I will try to overcome this issue using dynamic load balancing techniques.

5. Lock-free parallel BFS with dynamic load balancing

The idea is, each thread t will be provided its own cur_level array (cur_level_t[t]) as well as its own next_level array (next_level_t[t]). During each level exploration, each thread t will explore all the vertices in cur_level_t[t]. Once it has finished exploring the vertices in cur_level_t[t], the thread will steal vertices from other cur_level arrays and explore them until there are no unexplored vertices left in cur_level_t.

I will be refering to the BFSCL algorithm presented in the paper title Avoiding Locks and Atomic Instructions in Shared-Memory Parallel BFS Using Optimistic Parallelization by J. Tithi et al. As one can expect, this implementation will be much more complicated than the statically load balanced implementation.