Skip to content

Commit

Permalink
Merge pull request openucx#8 from zheng871026/huawei
Browse files Browse the repository at this point in the history
fix big packet for discontig datatype
  • Loading branch information
nsosnsos authored Nov 28, 2020
2 parents 5158822 + e148df8 commit 9dd57be
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
4 changes: 2 additions & 2 deletions builtin/ops/builtin_cb.inl
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ static int ucg_builtin_comp_recv_noncontig_one_cb(ucg_builtin_request_t *req,
uint64_t offset, void *data, size_t length)
{
req->op->recv_dt->ops.unpack(req->step->bcopy.unpack_state.dt.generic.state,
offset, data, length / req->op->super.params.recv.count);
offset, data, length);
(void) ucg_builtin_comp_step_cb(req, NULL);
return 1;
}
Expand All @@ -187,7 +187,7 @@ static int ucg_builtin_comp_recv_noncontig_one_then_send_cb(ucg_builtin_request_
uint64_t offset, void *data, size_t length)
{
req->op->recv_dt->ops.unpack(req->step->bcopy.unpack_state.dt.generic.state,
offset, data, length / req->op->super.params.recv.count);
offset, data, length);
req->recv_comp = 1;
(void) ucg_builtin_step_execute(req, NULL);
return 1;
Expand Down
20 changes: 11 additions & 9 deletions builtin/ops/builtin_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -884,11 +884,11 @@ static UCS_F_ALWAYS_INLINE void ucg_builtin_step_fragment_flags(size_t thresh_on
static UCS_F_ALWAYS_INLINE ucs_status_t ucg_builtin_step_recv_flags(ucg_builtin_op_step_t *step,
ucg_builtin_plan_phase_t *phase,
const ucg_collective_params_t *params,
size_t dt_len, int is_recv_contig,
enum ucg_builtin_op_step_flags *recv_flag)
{
*recv_flag = (enum ucg_builtin_op_step_flags)0;
size_t length = step->buffer_length;
size_t dt_len = params->send.dt_len;
size_t fragment_length = 0;
unsigned partial_length = 0;

Expand All @@ -900,18 +900,18 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucg_builtin_step_recv_flags(ucg_builtin_
/*
* Short messages (e.g. RDMA "inline")
*/
if (length <= phase->recv_thresh.max_short_one) {
if (length <= phase->recv_thresh.max_short_one && is_recv_contig) {
/* Short send - single message */
step->fragments_recv = 1;
} else if (length <= phase->recv_thresh.max_short_max) {
} else if (length <= phase->recv_thresh.max_short_max && is_recv_contig) {
/* Short send - multiple messages */
ucg_builtin_step_fragment_flags(phase->recv_thresh.max_short_one, dt_len, length,
step, phase, recv_flag);
/*
* Large messages, if supported (e.g. RDMA "zero-copy")
*/
} else if ((length > phase->recv_thresh.max_bcopy_max) &&
(length <= phase->recv_thresh.md_attr_cap_max_reg)) {
(length <= phase->recv_thresh.md_attr_cap_max_reg) && is_recv_contig) {
if (length < phase->recv_thresh.max_zcopy_one) {
/* ZCopy send - single message */
step->fragments_recv = 1;
Expand All @@ -920,7 +920,6 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucg_builtin_step_recv_flags(ucg_builtin_
ucg_builtin_step_fragment_flags(phase->recv_thresh.max_zcopy_one, dt_len, length,
step, phase, recv_flag);
}

/*
* Medium messages
*/
Expand Down Expand Up @@ -953,12 +952,13 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucg_builtin_step_recv_flags(ucg_builtin_
static inline size_t
ucg_builtin_step_get_gen_dt_length(ucg_builtin_op_step_t *step,
ucp_datatype_t datatype,
const ucg_collective_params_t *params)
const ucg_collective_params_t *params,
int count)
{
/* need to generate a one-time state to figure out the packed size */
ucp_dt_generic_t *dt_gen = ucp_dt_generic(datatype);
ucg_builtin_init_state(step, 1, dt_gen, params);
size_t len = dt_gen->ops.packed_size(step->bcopy.pack_state.dt.generic.state);
size_t len = dt_gen->ops.packed_size(step->bcopy.pack_state.dt.generic.state) / count;
ucg_builtin_finalize_state(step, 1, dt_gen);
return len;
}
Expand All @@ -977,7 +977,7 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_phase_t *phase,
/* Set the parameters determining the send-flags later on */
int is_send_contig = UCP_DT_IS_CONTIG(send_dtype);
size_t dt_len = is_send_contig ? params->send.dt_len :
ucg_builtin_step_get_gen_dt_length(step, send_dtype, params);
ucg_builtin_step_get_gen_dt_length(step, send_dtype, params, params->send.count);
step->buffer_length = dt_len * params->send.count;
step->uct_md = phase->md;
if (phase->md) {
Expand Down Expand Up @@ -1208,7 +1208,9 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_phase_t *phase,
ucs_error("Invalid method for a collective operation.");
return UCS_ERR_INVALID_PARAM;
}
status = ucg_builtin_step_recv_flags(step, phase, params, &recv_flag);
dt_len = is_recv_contig ? params->recv.dt_len :
ucg_builtin_step_get_gen_dt_length(step, recv_dtype, params, params->recv.count);
status = ucg_builtin_step_recv_flags(step, phase, params, dt_len, is_recv_contig, &recv_flag);
if (status != UCS_OK) {
return status;
}
Expand Down

0 comments on commit 9dd57be

Please # to comment.