Skip to content

Commit

Permalink
src: Continue passing NIC index were needed
Browse files Browse the repository at this point in the history
  • Loading branch information
philipmarshall21 committed May 17, 2024
1 parent 897aa51 commit 2a3ec52
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 252 deletions.
233 changes: 123 additions & 110 deletions src/collectives.c

Large diffs are not rendered by default.

142 changes: 101 additions & 41 deletions src/collectives_c.c4

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ shmem_internal_shutdown(void)
return;
}

shmem_internal_barrier_all();
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
shmem_internal_barrier_all(nic_idx);

shmem_internal_finalized = 1;

Expand Down
12 changes: 9 additions & 3 deletions src/lock_c.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ shmem_clear_lock(long *lockp)
SHMEM_ERR_CHECK_INITIALIZED();
SHMEM_ERR_CHECK_SYMMETRIC(lockp, sizeof(long));

shmem_internal_clear_lock(lockp);
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
shmem_internal_clear_lock(lockp, nic_idx);
}


Expand All @@ -54,7 +56,9 @@ shmem_set_lock(long *lockp)
SHMEM_ERR_CHECK_INITIALIZED();
SHMEM_ERR_CHECK_SYMMETRIC(lockp, sizeof(long));

shmem_internal_set_lock(lockp);
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
shmem_internal_set_lock(lockp, nic_idx);
}


Expand All @@ -64,5 +68,7 @@ shmem_test_lock(long *lockp)
SHMEM_ERR_CHECK_INITIALIZED();
SHMEM_ERR_CHECK_SYMMETRIC(lockp, sizeof(long));

return shmem_internal_test_lock(lockp);
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
return shmem_internal_test_lock(lockp, nic_idx);
}
120 changes: 68 additions & 52 deletions src/shmem_collectives.h

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions src/shmem_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ typedef struct lock_t lock_t;


static inline void
shmem_internal_clear_lock(long *lockp)
shmem_internal_clear_lock(long *lockp, size_t nic_idx)
{
lock_t *lock = (lock_t*) lockp;
int curr, cond, zero = 0, sig = SIGNAL_MASK;
Expand All @@ -47,8 +47,8 @@ shmem_internal_clear_lock(long *lockp)
/* release the lock if I'm the last to try to obtain it */
cond = shmem_internal_my_pe + 1;
shmem_internal_cswap(SHMEM_CTX_DEFAULT, &(lock->last), &zero, &curr, &cond,
sizeof(int), 0, SHM_INTERNAL_INT, 0); // Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, 0); // Multiplex across NICs?
sizeof(int), 0, SHM_INTERNAL_INT, nic_idx); // Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, nic_idx); // Multiplex across NICs?

/* if local PE was not the last to hold the lock, look for the next in line */
if (curr != shmem_internal_my_pe + 1) {
Expand All @@ -58,8 +58,8 @@ shmem_internal_clear_lock(long *lockp)
for (;;) {
shmem_internal_atomic_fetch(SHMEM_CTX_DEFAULT, &cur_data, &(lock->data),
sizeof(int), shmem_internal_my_pe,
SHM_INTERNAL_INT, 0); // Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, 0); // Multiplex across NICs?
SHM_INTERNAL_INT, nic_idx); // Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, nic_idx); // Multiplex across NICs?

if (NEXT(cur_data) != 0)
break;
Expand All @@ -69,21 +69,21 @@ shmem_internal_clear_lock(long *lockp)

/* set the signal bit on new lock holder */
shmem_internal_mswap(SHMEM_CTX_DEFAULT, &(lock->data), &sig, &curr,
&sig, sizeof(int), NEXT(cur_data) - 1, SHM_INTERNAL_INT, 0);// Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, 0); // Multiplex across NICs?
&sig, sizeof(int), NEXT(cur_data) - 1, SHM_INTERNAL_INT, nic_idx);// Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, nic_idx); // Multiplex across NICs?
}
}


static inline void
shmem_internal_set_lock(long *lockp)
shmem_internal_set_lock(long *lockp, size_t nic_idx)
{
lock_t *lock = (lock_t*) lockp;
int curr, zero = 0, me = shmem_internal_my_pe + 1;

/* initialize my elements to zero */
shmem_internal_atomic_set(SHMEM_CTX_DEFAULT, &(lock->data), &zero,
sizeof(zero), shmem_internal_my_pe, SHM_INTERNAL_INT);
sizeof(zero), shmem_internal_my_pe, SHM_INTERNAL_INT, nic_idx);
shmem_internal_quiet(SHMEM_CTX_DEFAULT);

/* update last with my value to add me to the queue */
Expand All @@ -96,16 +96,16 @@ shmem_internal_set_lock(long *lockp)
int next_mask = NEXT_MASK;

shmem_internal_mswap(SHMEM_CTX_DEFAULT, &(lock->data), &me, &curr,
&next_mask, sizeof(int), curr - 1, SHM_INTERNAL_INT, 0); // Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, 0); // Multiplex across NICs?
&next_mask, sizeof(int), curr - 1, SHM_INTERNAL_INT, nic_idx); // Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, nic_idx); // Multiplex across NICs?

/* now wait for the signal part of data to be non-zero */
for (;;) {
int cur_data;

shmem_internal_atomic_fetch(SHMEM_CTX_DEFAULT, &cur_data, &(lock->data),
sizeof(int), shmem_internal_my_pe, SHM_INTERNAL_INT, 0); // Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, 0); // Multiplex across NICs?
sizeof(int), shmem_internal_my_pe, SHM_INTERNAL_INT, nic_idx); // Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, nic_idx); // Multiplex across NICs?

if (SIGNAL(cur_data) != 0)
break;
Expand All @@ -122,20 +122,20 @@ shmem_internal_set_lock(long *lockp)


static inline int
shmem_internal_test_lock(long *lockp)
shmem_internal_test_lock(long *lockp, size_t nic_idx)
{
lock_t *lock = (lock_t*) lockp;
int curr, me = shmem_internal_my_pe + 1, zero = 0;

/* initialize my elements to zero */
shmem_internal_atomic_set(SHMEM_CTX_DEFAULT, &(lock->data), &zero,
sizeof(zero), shmem_internal_my_pe, SHM_INTERNAL_INT);
sizeof(zero), shmem_internal_my_pe, SHM_INTERNAL_INT, nic_idx);
shmem_internal_quiet(SHMEM_CTX_DEFAULT);

/* add self to last if and only if the lock is zero (ie, no one has the lock) */
shmem_internal_cswap(SHMEM_CTX_DEFAULT, &(lock->last), &me, &curr, &zero,
sizeof(int), 0, SHM_INTERNAL_INT, 0); // Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, 0); // Multiplex across NICs?
sizeof(int), 0, SHM_INTERNAL_INT, nic_idx); // Multiplex across NICs?
shmem_internal_get_wait(SHMEM_CTX_DEFAULT, nic_idx); // Multiplex across NICs?

if (0 == curr) {
shmem_internal_membar_acquire();
Expand Down
30 changes: 16 additions & 14 deletions src/shmem_team.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ int shmem_internal_team_translate_pe(shmem_internal_team_t *src_team, int src_pe

int shmem_internal_team_split_strided(shmem_internal_team_t *parent_team, int PE_start, int PE_stride,
int PE_size, const shmem_team_config_t *config, long config_mask,
shmem_internal_team_t **new_team)
shmem_internal_team_t **new_team, size_t nic_idx)
{

*new_team = SHMEM_TEAM_INVALID;
Expand Down Expand Up @@ -320,7 +320,7 @@ int shmem_internal_team_split_strided(shmem_internal_team_t *parent_team, int PE
int my_pe = shmem_internal_pe_in_active_set(shmem_internal_my_pe,
global_PE_start, PE_stride, PE_size);

long *psync = shmem_internal_team_choose_psync(parent_team, REDUCE);
long *psync = shmem_internal_team_choose_psync(parent_team, REDUCE, nic_idx);
shmem_internal_team_t *myteam = NULL;
*team_ret_val = 0;
*team_ret_val_reduced = 0;
Expand Down Expand Up @@ -366,7 +366,7 @@ int shmem_internal_team_split_strided(shmem_internal_team_t *parent_team, int PE
shmem_internal_op_to_all(psync_pool_avail_reduced,
psync_pool_avail, N_PSYNC_BYTES, 1,
myteam->start, PE_stride, PE_size, NULL,
psync, SHM_INTERNAL_BAND, SHM_INTERNAL_UCHAR);
psync, SHM_INTERNAL_BAND, SHM_INTERNAL_UCHAR, nic_idx);

/* We cannot release the psync here, because this reduction may not
* have been performed on the entire parent team. */
Expand Down Expand Up @@ -406,18 +406,18 @@ int shmem_internal_team_split_strided(shmem_internal_team_t *parent_team, int PE

/* This barrier on the parent team eliminates problematic race conditions
* during psync allocation between back-to-back team creations. */
psync = shmem_internal_team_choose_psync(parent_team, SYNC);
psync = shmem_internal_team_choose_psync(parent_team, SYNC, nic_idx);

shmem_internal_barrier(parent_team->start, parent_team->stride, parent_team->size, psync);
shmem_internal_barrier(parent_team->start, parent_team->stride, parent_team->size, psync, nic_idx);

shmem_internal_team_release_psyncs(parent_team, SYNC);

/* This OR reduction assures all PEs return the same value. */
psync = shmem_internal_team_choose_psync(parent_team, REDUCE);
psync = shmem_internal_team_choose_psync(parent_team, REDUCE, nic_idx);

shmem_internal_op_to_all(team_ret_val_reduced, team_ret_val, 1, sizeof(int),
parent_team->start, parent_team->stride, parent_team->size, NULL,
psync, SHM_INTERNAL_MAX, SHM_INTERNAL_INT);
psync, SHM_INTERNAL_MAX, SHM_INTERNAL_INT, nic_idx);

shmem_internal_team_release_psyncs(parent_team, REDUCE);

Expand All @@ -433,7 +433,7 @@ int shmem_internal_team_split_strided(shmem_internal_team_t *parent_team, int PE
int shmem_internal_team_split_2d(shmem_internal_team_t *parent_team, int xrange,
const shmem_team_config_t *xaxis_config, long xaxis_mask,
shmem_internal_team_t **xaxis_team, const shmem_team_config_t *yaxis_config,
long yaxis_mask, shmem_internal_team_t **yaxis_team)
long yaxis_mask, shmem_internal_team_t **yaxis_team, size_t nic_idx)
{
*xaxis_team = SHMEM_TEAM_INVALID;
*yaxis_team = SHMEM_TEAM_INVALID;
Expand All @@ -460,7 +460,8 @@ int shmem_internal_team_split_2d(shmem_internal_team_t *parent_team, int xrange,
int xsize = (i == num_xteams - 1 && parent_size % xrange) ? parent_size % xrange : xrange;

ret = shmem_internal_team_split_strided(parent_team, start, parent_stride,
xsize, xaxis_config, xaxis_mask, &my_xteam);
xsize, xaxis_config, xaxis_mask, &my_xteam,
nic_idx);
if (ret) {
RAISE_ERROR_MSG("Creation of x-axis team %d of %d failed\n", i+1, num_xteams);
}
Expand All @@ -481,7 +482,8 @@ int shmem_internal_team_split_2d(shmem_internal_team_t *parent_team, int xrange,
int ysize = (remainder && i < remainder) ? yrange + 1 : yrange;

ret = shmem_internal_team_split_strided(parent_team, start, xrange*parent_stride,
ysize, yaxis_config, yaxis_mask, &my_yteam);
ysize, yaxis_config, yaxis_mask, &my_yteam,
nic_idx);
if (ret) {
RAISE_ERROR_MSG("Creation of y-axis team %d of %d failed\n", i+1, num_yteams);
}
Expand All @@ -493,9 +495,9 @@ int shmem_internal_team_split_2d(shmem_internal_team_t *parent_team, int xrange,
}
}

long *psync = shmem_internal_team_choose_psync(parent_team, SYNC);
long *psync = shmem_internal_team_choose_psync(parent_team, SYNC, nic_idx);

shmem_internal_barrier(parent_start, parent_stride, parent_size, psync);
shmem_internal_barrier(parent_start, parent_stride, parent_size, psync, nic_idx);

shmem_internal_team_release_psyncs(parent_team, SYNC);

Expand Down Expand Up @@ -535,7 +537,7 @@ int shmem_internal_team_destroy(shmem_internal_team_t *team)

/* Returns a psync from the given team that can be safely used for the
* specified collective operation. */
long * shmem_internal_team_choose_psync(shmem_internal_team_t *team, shmem_internal_team_op_t op)
long * shmem_internal_team_choose_psync(shmem_internal_team_t *team, shmem_internal_team_op_t op, size_t nic_idx)
{

switch (op) {
Expand All @@ -556,7 +558,7 @@ long * shmem_internal_team_choose_psync(shmem_internal_team_t *team, shmem_inter

size_t psync = team->psync_idx * SHMEM_SYNC_SIZE;
shmem_internal_sync(team->start, team->stride, team->size,
&shmem_internal_psync_barrier_pool[psync]);
&shmem_internal_psync_barrier_pool[psync], nic_idx);

for (int i = 0; i < N_PSYNCS_PER_TEAM; i++) {
team->psync_avail[i] = 1;
Expand Down
7 changes: 4 additions & 3 deletions src/shmem_team.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,20 @@ int shmem_internal_team_translate_pe(shmem_internal_team_t *src_team, int src_pe

int shmem_internal_team_split_strided(shmem_internal_team_t *parent_team, int PE_start, int PE_stride,
int PE_size, const shmem_team_config_t *config, long config_mask,
shmem_internal_team_t **new_team);
shmem_internal_team_t **new_team, size_t nic_idx);

int shmem_internal_team_split_2d(shmem_internal_team_t *parent_team, int xrange,
const shmem_team_config_t *xaxis_config, long xaxis_mask, shmem_internal_team_t **xaxis_team,
const shmem_team_config_t *yaxis_config, long yaxis_mask, shmem_internal_team_t **yaxis_team);
const shmem_team_config_t *yaxis_config, long yaxis_mask, shmem_internal_team_t **yaxis_team,
size_t nic_idx);

int shmem_internal_team_destroy(shmem_internal_team_t *team);

int shmem_internal_team_create_ctx(shmem_internal_team_t *team, long options, shmem_ctx_t *ctx);

int shmem_internal_ctx_get_team(shmem_ctx_t ctx, shmem_internal_team_t **team);

long * shmem_internal_team_choose_psync(shmem_internal_team_t *team, shmem_internal_team_op_t op);
long * shmem_internal_team_choose_psync(shmem_internal_team_t *team, shmem_internal_team_op_t op, size_t nic_idx);

void shmem_internal_team_release_psyncs(shmem_internal_team_t *team, shmem_internal_team_op_t op);

Expand Down
30 changes: 21 additions & 9 deletions src/symmetric_heap_c.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,9 @@ shmem_malloc(size_t size)
ret = dlmalloc(size);
SHMEM_MUTEX_UNLOCK(shmem_internal_mutex_alloc);

shmem_internal_barrier_all();
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
shmem_internal_barrier_all(nic_idx);

return ret;
}
Expand All @@ -313,7 +315,9 @@ shmem_calloc(size_t count, size_t size)
ret = dlcalloc(count, size);
SHMEM_MUTEX_UNLOCK(shmem_internal_mutex_alloc);

shmem_internal_barrier_all();
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
shmem_internal_barrier_all(nic_idx);

return ret;
}
Expand All @@ -326,7 +330,9 @@ shmem_free(void *ptr)
SHMEM_ERR_CHECK_SYMMETRIC_HEAP(ptr);
}

shmem_internal_barrier_all();
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
shmem_internal_barrier_all(nic_idx);

shmem_internal_free(ptr);
}
Expand All @@ -344,7 +350,9 @@ shmem_realloc(void *ptr, size_t size)
SHMEM_ERR_CHECK_SYMMETRIC_HEAP(ptr);
}

shmem_internal_barrier_all();
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
shmem_internal_barrier_all(nic_idx);

SHMEM_MUTEX_LOCK(shmem_internal_mutex_alloc);
if (size == 0 && ptr != NULL) {
Expand All @@ -355,7 +363,7 @@ shmem_realloc(void *ptr, size_t size)
}
SHMEM_MUTEX_UNLOCK(shmem_internal_mutex_alloc);

shmem_internal_barrier_all();
shmem_internal_barrier_all(nic_idx);

return ret;
}
Expand All @@ -376,7 +384,9 @@ shmem_align(size_t alignment, size_t size)
ret = dlmemalign(alignment, size);
SHMEM_MUTEX_UNLOCK(shmem_internal_mutex_alloc);

shmem_internal_barrier_all();
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
shmem_internal_barrier_all(nic_idx);

return ret;
}
Expand Down Expand Up @@ -430,9 +440,11 @@ shmem_malloc_with_hints(size_t size, long hints)
ret = dlmalloc(size);
SHMEM_MUTEX_UNLOCK(shmem_internal_mutex_alloc);

if (!(hints & SHMEMX_MALLOC_NO_BARRIER))
shmem_internal_barrier_all();

if (!(hints & SHMEMX_MALLOC_NO_BARRIER)) {
size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
shmem_internal_barrier_all(nic_idx);
}
return ret;
}

Expand Down
10 changes: 8 additions & 2 deletions src/teams_c.c4
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,12 @@ shmem_team_split_strided(shmem_team_t parent_team, int PE_start,
{
SHMEM_ERR_CHECK_INITIALIZED();

size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
return shmem_internal_team_split_strided((shmem_internal_team_t *)parent_team,
PE_start, PE_stride, PE_size, config,
config_mask, (shmem_internal_team_t **)new_team);
config_mask, (shmem_internal_team_t **)new_team,
nic_idx);
}

int SHMEM_FUNCTION_ATTRIBUTES
Expand All @@ -128,11 +131,14 @@ shmem_team_split_2d(shmem_team_t parent_team, int xrange,
{
SHMEM_ERR_CHECK_INITIALIZED();

size_t nic_idx = 0;
SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx);
return shmem_internal_team_split_2d((shmem_internal_team_t *)parent_team,
xrange, xaxis_config, xaxis_mask,
(shmem_internal_team_t **)xaxis_team,
yaxis_config, yaxis_mask,
(shmem_internal_team_t **)yaxis_team);
(shmem_internal_team_t **)yaxis_team,
nic_idx);
}

int SHMEM_FUNCTION_ATTRIBUTES
Expand Down

0 comments on commit 2a3ec52

Please sign in to comment.