Skip to content

Commit

Permalink
Merge pull request openucx#9 from RainybIue/huawei
Browse files Browse the repository at this point in the history
solved the segment error of ucg_builtin_am_handler
  • Loading branch information
nsosnsos authored Dec 3, 2020
2 parents 6b6feb2 + 6ed9f6a commit 41b0d58
Showing 1 changed file with 51 additions and 16 deletions.
67 changes: 51 additions & 16 deletions builtin/builtin.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,18 @@ struct ucg_builtin_group_ctx {
ucg_builtin_comp_slot_t slots[UCG_BUILTIN_MAX_CONCURRENT_OPS];
};

typedef struct ucg_builtin_am_buffer {
int group_id;
char used;
void *data;
size_t length;
unsigned am_flags;
}ucg_builtin_am_buffer_t;

typedef struct ucg_builtin_ctx {
unsigned slots_total;
unsigned slots_used;
ucg_builtin_am_buffer_t buffer;
ucg_builtin_comp_slot_t *slots[];
} ucg_builtin_ctx_t;

Expand Down Expand Up @@ -147,18 +156,12 @@ enum ucg_builtin_plan_topology_type ucg_builtin_choose_type(enum ucg_collective_
return UCG_PLAN_TREE_FANIN_FANOUT;
}

UCS_PROFILE_FUNC(ucs_status_t, ucg_builtin_am_handler,
(arg, data, length, am_flags),
void *arg, void *data, size_t length, unsigned am_flags)
{
ucg_builtin_header_t* header = data;
ucg_builtin_ctx_t **ctx = UCG_WORKER_TO_COMPONENT_CTX(ucg_builtin_component, arg);
ucg_builtin_comp_slot_t *slot = &(*ctx)->slots[header->group_id]
[header->coll_id % UCG_BUILTIN_MAX_CONCURRENT_OPS];
ucs_assert(header->group_id < (*ctx)->slots_total);
ucs_assert(length >= sizeof(header));
static ucs_status_t ucg_builtin_am_process(ucg_builtin_comp_slot_t *slot, void *data, size_t length,
unsigned am_flags)

/* Consume the message if it fits the current collective and step index */
{
ucg_builtin_header_t *header = data;
/* Consume the message if it fits the current collective and step index */
if (ucs_likely(slot->cb && (header->local_id == slot->local_id))) {
/* Make sure the packet indeed belongs to the collective currently on */
ucs_debug("ucg_builtin_am_handler CB: coll_id %u step_idx %u cb %p pending %u",
Expand Down Expand Up @@ -234,6 +237,34 @@ UCS_PROFILE_FUNC(ucs_status_t, ucg_builtin_am_handler,
return ret;
}

UCS_PROFILE_FUNC(ucs_status_t, ucg_builtin_am_handler,
(arg, data, length, am_flags),
void *arg, void *data, size_t length, unsigned am_flags)
{
ucg_builtin_header_t *header = data;
ucg_builtin_ctx_t **ctx = UCG_WORKER_TO_COMPONENT_CTX(ucg_builtin_component, arg);
ucg_builtin_comp_slot_t *slot = NULL;
ucg_group_id_t group_id = header->group_id;
ucs_assert(length >= sizeof(header));
if ((*ctx)->slots_total > group_id) {
slot = &(*ctx)->slots[group_id][header->coll_id % UCG_BUILTIN_MAX_CONCURRENT_OPS];
} else {
/* rank A and rank B both creating a new group, This is creates a "race condition",
where A maybe sends a message to B before B finished creating the group.
At this point, we will encounter the situation that slots_total and group_id are equal.
Therefore, we need to store the message and process it when B creates the group. */
ucg_builtin_am_buffer_t *buffer = &(*ctx)->buffer;
buffer->data = data;
buffer->group_id = group_id;
buffer->length = length;
buffer->am_flags = am_flags;
buffer->used = 1;
return (am_flags & UCT_CB_PARAM_FLAG_DESC) ? UCS_INPROGRESS : UCS_OK;
}

return ucg_builtin_am_process(slot, data, length, am_flags);
}

void ucg_builtin_msg_dump(ucp_worker_h worker, uct_am_trace_type_t type,
uint8_t id, const void *data, size_t length,
char *buffer, size_t max)
Expand Down Expand Up @@ -291,15 +322,11 @@ static ucs_status_t ucg_builtin_create(ucg_plan_component_t *plan_component,
*bctx = temp;
return UCS_ERR_NO_MEMORY;
}

(*bctx)->slots_total = group_id + 1;
if (temp == NULL) {
(*bctx)->slots_used = 0;
}
(*bctx)->slots_used = (temp == NULL) ? 0 : (*bctx)->slots_used;
} else {
(*bctx)->slots_used++;
}

/* Fill in the information in the per-group context */
ucg_builtin_group_ctx_t *gctx =
UCG_GROUP_TO_COMPONENT_CTX(ucg_builtin_component, group);
Expand All @@ -324,6 +351,14 @@ static ucs_status_t ucg_builtin_create(ucg_plan_component_t *plan_component,
/* Link the two contexts */
(*bctx)->slots[group_id] = gctx->slots;

if ((*bctx)->buffer.used == 1 && (*bctx)->buffer.group_id == group_id) {
ucg_builtin_am_buffer_t *buffer = &(*bctx)->buffer;
ucg_builtin_header_t *header = buffer->data;
(void)ucg_builtin_am_process(&gctx->slots[header->coll_id], buffer->data,
buffer->length, buffer->am_flags);
buffer->used = 0;
}

return ucg_builtin_init_plan_config(plan_component);
}

Expand Down

0 comments on commit 41b0d58

Please # to comment.