Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat ✨ : Add closeness centrality and floyd warshall feature #72

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4a4809b
docs: :construction: write the first definition of new function
Fre0Grella Jul 15, 2024
62c646d
feat: :sparkles: implement the core function of tiled floyd warshall
Fre0Grella Jul 18, 2024
b93a4e4
feat: :construction: Add some private function and a parameter for ma…
Fre0Grella Jul 21, 2024
45770c2
feat: :sparkles: Add code for the third phase of the alogrithm
Fre0Grella Jul 24, 2024
f5e6065
feat: :sparkles: Add feature for graph with a prime number of nodes
Fre0Grella Jul 25, 2024
645d67d
docs: :memo: Remove TODO marker and update docs
Fre0Grella Jul 25, 2024
8eb1b93
feat: :sparkles: Implement the closeness centrality function
Fre0Grella Jul 31, 2024
abcab94
Merge branch 'main' into feature/closeness
Fre0Grella Jul 31, 2024
90e1063
feat: :wrench: add closeness alg into interface.py and __init__.py
Fre0Grella Jul 31, 2024
e88c3b7
fix: :construction: change parallel graph object into normal graph ob…
Fre0Grella Aug 2, 2024
d73fd49
fix: :construction: Fix indexing problem of the matrix, still not wor…
Fre0Grella Aug 3, 2024
ee252ff
fix: :construction: fix matrix shape problem
Fre0Grella Aug 4, 2024
1e68ae4
fix: :bug: Fix Floyd Warshall Tiling, functioning
Fre0Grella Aug 8, 2024
c4e89c0
feat: :sparkles: Introduce normal floyd_warshall instead of numpy flo…
Fre0Grella Aug 14, 2024
03071c2
fix: :bug: fix bug in the submatrix division function
Fre0Grella Aug 18, 2024
6e5932c
fix: :construction: fix weight problem
Fre0Grella Aug 19, 2024
76e9ef4
Merge branch 'networkx:main' into feature/closeness
Fre0Grella Sep 11, 2024
020eaad
style :art: fix format style
Sep 23, 2024
af67d0d
Merge branch 'main' into feature/closeness
Schefflera-Arboricola Sep 27, 2024
6ecb2e8
sorry missed closeness_centrality
Schefflera-Arboricola Sep 27, 2024
0733949
Updating __init__.py
Schefflera-Arboricola Sep 27, 2024
eb9b139
Merge branch 'networkx:main' into feature/closeness
Fre0Grella Sep 27, 2024
1048683
style: :green_heart: fix style
Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ def get_info():
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks."
},
},
"closeness_centrality": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/closeness.py#L11",
"additional_docs": "The parallel implementation of closeness centrality, that use parallel tiled floy warshall to find the geodesic distance of the node",
"additional_parameters": {
"blocking_factor : number": "The number used for divinding the adjacency matrix in sub-matrix. The default blocking factor is get by finding the optimal value for the core available"
},
},
"closeness_vitality": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/vitality.py#L10",
"additional_docs": "The parallel computation is implemented only when the node is not specified. The closeness vitality for each node is computed concurrently.",
Expand All @@ -104,6 +111,14 @@ def get_info():
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks."
},
},
"floyd_warshall": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/dense.py#L12",
"additional_docs": "Parallel implementation of Floyd warshall using the tiled floyd warshall algorithm [1]_.",
"additional_parameters": {
"blocking_factor : number": "The number used for divinding the adjacency matrix in sub-matrix. The default blocking factor is get by finding the optimal value for the core available",
"A : 2D array": "All pairs shortest paths Graph adjacency matrix",
},
},
"is_reachable": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L13",
"additional_docs": "The function parallelizes the calculation of two neighborhoods of vertices in `G` and checks closure conditions for each neighborhood subset in parallel.",
Expand Down
1 change: 1 addition & 0 deletions nx_parallel/algorithms/centrality/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .betweenness import *
from .closeness import *
73 changes: 73 additions & 0 deletions nx_parallel/algorithms/centrality/closeness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Closeness centrality measures.
"""

from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = ["closeness_centrality"]


def closeness_centrality(
G, u=None, distance=None, wf_improved=True, blocking_factor=None
):
"""The parallel implementation of closeness centrality, that use parallel tiled floy warshall to find the
geodesic distance of the node

networkx.closeness_centrality : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.centrality.closeness_centrality.html

Parameters
----------
blocking_factor : number
The number used for divinding the adjacency matrix in sub-matrix.
The default blocking factor is get by finding the optimal value
for the core available
"""
if hasattr(G, "graph_object"):
G = G.graph_object

if G.is_directed():
G = G.reverse() # create a reversed graph view

A = nxp.floyd_warshall(G, weight=distance, blocking_factor=blocking_factor)
len_G = len(G)

key_value_pair = Parallel(n_jobs=-1)(
delayed(_closeness_measure)(k, n, wf_improved, len_G) for k, n in A.items()
)
closeness_dict = {}
for key, value in key_value_pair:
closeness_dict[key] = value
if u is not None:
return closeness_dict[u]
return closeness_dict


def _closeness_measure(k, v, wf_improved, len_G):
"""calculate the closeness centrality measure of one node using the row of edges i

Parameters
----------
n : 1D numpy.ndarray
the array of distance from every other node

Returns
-------
k : numebr
the closeness value for the selected node
"""
n = v.values()
# print(n)
n_reachable = [x for x in n if x != float("inf")]
# print(n_reachable,len(n_reachable))
totsp = sum(n_reachable)
# print(totsp)
closeness_value = 0.0
if totsp > 0.0 and len_G > 1:
closeness_value = (len(n_reachable) - 1.0) / totsp
# normalize to number of nodes-1 in connected part
if wf_improved:
s = (len(n_reachable) - 1.0) / (len_G - 1)
closeness_value *= s

return k, closeness_value
Comment on lines +46 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it could make sense to include an initial check in this function for graph disconnectedness, the goal being to "fail fast" to prevent edge-case scenarios where disconnected nodes lead to infinite geodesic distances. WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, a transitive closure test could be added in the future i think

1 change: 1 addition & 0 deletions nx_parallel/algorithms/shortest_paths/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .generic import *
from .weighted import *
from .unweighted import *
from .dense import *
251 changes: 251 additions & 0 deletions nx_parallel/algorithms/shortest_paths/dense.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
"""Floyd-Warshall algorithm for shortest paths."""

from joblib import Parallel, delayed
import nx_parallel as nxp
import math

__all__ = [
"floyd_warshall",
]


def floyd_warshall(G, weight="weight", blocking_factor=None):
"""
Parallel implementation of Floyd warshall using the tiled floyd warshall algorithm [1]_.


networkx.floyd_warshall_numpy : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.dense.floyd_warshall_numpy.html

Parameters
----------
blocking_factor : number
The number used for divinding the adjacency matrix in sub-matrix.
The default blocking factor is get by finding the optimal value
for the core available

Returns
-------
A : 2D array
All pairs shortest paths Graph adjacency matrix

References
----------
.. [1] Gary J. Katz and Joseph T. Kider Jr:
All-Pairs Shortest-Paths for Large Graphs on the GPU, 2008.

"""
if hasattr(G, "graph_object"):
G = G.graph_object
undirected = not G.is_directed()
nodelist = list(G)
A = _adjacency_matrix(G, weight, nodelist, undirected)
n = G.number_of_nodes()

total_cores = nxp.get_n_jobs()

if blocking_factor is None:
blocking_factor, is_prime = _find_nearest_divisor(n, total_cores)
no_of_primary = n // blocking_factor

for primary_block in range(no_of_primary):
k_start = (primary_block * n) // no_of_primary
k_end = k_start + (n // no_of_primary) - 1
if is_prime and primary_block == no_of_primary - 1:
k_end = k_end + (n % no_of_primary)
k = (k_start, k_end)
# Phase 1: Compute Primary block
# Execute Normal floyd warshall for the primary block submatrix
_partial_floyd_warshall_numpy(A, k, k, k)

# Phase 2: Compute Cross block

params = []
for block in range(no_of_primary):
# skip the primary block computed in phase 1
if block != primary_block:
# append the actual indices of the matrix by multiply the block number with the blocking factor
if is_prime and block == no_of_primary - 1:
block_coord = (block * blocking_factor, n - 1)
else:
block_coord = _block_range(blocking_factor, block)
params.append((block_coord, k))
params.append((k, block_coord))

Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")(
delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params
)

# Phase 3: Compute remaining
params.clear()
for block_i in range(no_of_primary):
for block_j in range(no_of_primary):
# skip all block previously computed, so skip every block with primary block value
if block_i != primary_block or block_j != primary_block:
i_range = _block_range(blocking_factor, block_i)
j_range = _block_range(blocking_factor, block_j)
if is_prime:
if block_i == no_of_primary - 1:
i_range = (block_i * blocking_factor, n - 1)
if block_j == no_of_primary - 1:
j_range = (block_j * blocking_factor, n - 1)
params.append((i_range, j_range))
Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")(
Copy link
Member

@Schefflera-Arboricola Schefflera-Arboricola Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recently we have made it so that all the parameters of the Parallel call are controlled by the user and we don't pass anything in it. I would highly encourage you(@Fre0Grella) to join our weekly nx-parallel meetings : https://scientific-python.org/calendars/networkx.ics , if you can. I think your insights and feedback would really help us shape the nx-parallel project in a better way. Thank you :)

delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params
)
dist = _matrix_to_dict(A, nodelist)

return dist


def _partial_floyd_warshall_numpy(A, k_iteration, i_iteration, j_iteration):
"""
Compute partial FW in the determined sub-block for the execution of
parallel tiled FW.

Parameters
----------
A : 2D numpy.ndarray
matrix that reppresent the adjacency matrix of the graph

k_iteration : tuple
range (start-end) of the primary block in the current iteration

i_iteration : tuple
range (start-end) of the rows in the current block computed

j_iteration : tuple
range (start-end) of the columns in the current block computed

Returns
-------
A : 2D numpy.ndarray
adjacency matrix updated after floyd warshall
"""
for k in range(k_iteration[0], k_iteration[1] + 1):
for i in range(i_iteration[0], i_iteration[1] + 1):
for j in range(j_iteration[0], j_iteration[1] + 1):
A[i][j] = min(A[i][j], (A[i][k] + A[k][j]))


def _block_range(blocking_factor, block):
return (block * blocking_factor, (block + 1) * blocking_factor - 1)


def _calculate_divisor(i, x, y):
if x % i == 0:
divisor1 = result2 = i
result1 = divisor2 = x // i
# difference1 = abs((result1 - 1) ** 2 - y)
difference1 = abs(result1 - y)
# difference2 = abs((result2 - 1) ** 2 - y)
difference2 = abs(result2 - y)

if difference1 < difference2:
Comment on lines +139 to +143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be missing something obvious, but aren't divisor1 and divisor2 actually going to be the same thing here (i)? i.e. do we even need to compare difference1 with difference2?

I wonder whether this could be simplified to something like:

def _calculate_divisor(i, x, y):
    if x % i == 0:
        quotient = x // i
        return (i, quotient, abs(i - y))
    return None

?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really the same thing, the idea is to find the divisor of x ( size of matrix) most similar to y (core available).
To find i try to divide x to all number from 1 to sqrt(x) so when i do x // i, divisor1 is gonna be i, the result of x // i is gonna be divisor2.

An example to clarify:
x=10
y=4
i=2
x // i = 5
So divisor1 = 2, divisor2 = 5

divisor2 is the most similar to 4 so i choose 5 as divisor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification on this @Fre0Grella

I think what I'm still struggling with is the need for two separate comparisons. Since divisor1 is i and divisor2 is x // i, aren't you always comparing i and x // i? The current implementation is functionally correct, but this seems like it could be greatly simplified without losing the intent of matching divisor2 more closely to y.

For example, maybe we could directly compute the abs difference of both i and x // i with y, then just return whichever is closest? And this would avoid the extra complexity in checking two sets of variables. So, building on my (incomplete) alternative suggestion previously, this might look something like:

def _calculate_divisor(i, x, y):
    if x % i == 0:
        divisor = x // i
        return (i, divisor, min(abs(i - y), abs(divisor - y)))
    return None

keeps the logic intact but then we don't need to assign and compare divisor1 and divisor2 explicitly. Let me know what you think / if I'm missing something!

return divisor1, result1, difference1
else:
return divisor2, result2, difference2
return None


def _find_nearest_divisor(x, y):
"""
find the optimal value for the blocking factor parameter

Parameters
----------
x : node number

y : cpu core available
"""
if x < y:
return 1, False
# Find the square root of x
sqrt_x = int(math.sqrt(x)) + 1

# Execute the calculation in parallel
results = Parallel(n_jobs=-1)(
delayed(_calculate_divisor)(i, x, y) for i in range(2, sqrt_x)
)

# Filter out None results
results = [r for r in results if r is not None]

if len(results) <= 0:
# This check if a number is prime, although repeat process with a non prime number
best_divisor, _ = _find_nearest_divisor(x - 1, y)
return best_divisor, True
# Find the best divisor
best_divisor, _, _ = min(results, key=lambda x: x[2])

return best_divisor, False
Comment on lines +150 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious-- for large graphs with prime numbers of nodes, it seems like the final block could be disproportionately larger, leading to load imbalance across cores?

Copy link
Author

@Fre0Grella Fre0Grella Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it could lead to load inbalance but the disproportion i think is sort of minimized.
I don't know if there's a better way of doing it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, what about just swapping:

best_divisor, _ = _find_nearest_divisor(x - 1, y)

with something as simple as:

best_divisor = max(1, x // y)

to distribute load approximately evenly if no divisor is found?



def _adjacency_matrix(G, weight, nodelist, undirected):
"""
Generate an adjacency python matrix

Parameters
----------
G : graph
The NetworkX graph used to construct the array.

weight : string or None optional (default = 'weight')
The edge attribute that holds the numerical value used for
the edge weight. If an edge does not have that attribute, then the
value 1 is used instead.

Returns
-------
A : 2D array
Graph adjacency matrix
"""

n = len(nodelist)
# Initialize the adjacency matrix with infinity values
A = [[float("inf") for _ in range(n)] for _ in range(n)]

# Set diagonal elements to 0 (distance from node to itself)
for i in range(n):
A[i][i] = 0

def process_edge(src, dest, attribute, undirected):
src_idx = nodelist.index(src)
dest_idx = nodelist.index(dest)
A[src_idx][dest_idx] = attribute.get(weight, 1.0)
if undirected:
A[dest_idx][src_idx] = attribute.get(weight, 1.0)

# Parallel processing of edges, modifying A directly
Parallel(n_jobs=-1, require="sharedmem")(
delayed(process_edge)(src, dest, attribute, undirected)
for src, dest, attribute in G.edges(data=True)
)
return A


def _matrix_to_dict(A, nodelist):
"""
Convert a matrix (list of lists) to a dictionary of dictionaries.

Parameters
----------
A : list of lists
The adjacency matrix to be converted.

Returns
-------
dist : dict
The resulting dictionary of distance.
"""
dist = {i: {} for i in nodelist}

def process_row(row, i):
for column, j in enumerate(nodelist):
dist[i][j] = A[row][column]

# Parallel processing of rows, modifying dist directly
Parallel(n_jobs=-1, require="sharedmem")(
delayed(process_row)(row, i) for row, i in enumerate(nodelist)
)

return dist
2 changes: 2 additions & 0 deletions nx_parallel/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# Centrality
"betweenness_centrality",
"edge_betweenness_centrality",
"closeness_centrality",
# Efficiency
"local_efficiency",
# Shortest Paths : generic
Expand All @@ -38,6 +39,7 @@
"approximate_all_pairs_node_connectivity",
# Connectivity
"connectivity.all_pairs_node_connectivity",
"floyd_warshall",
]


Expand Down
Loading