Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

CORTX-33917: Enable versioned CAS ops without DTM0 #2075

Merged
merged 20 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions cas/cas.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,18 +295,20 @@ enum m0_cas_op_flags {
*
* Overview
* --------
* Versions are taken from m0_cas_op::cg_txd. When this flag is set,
* the following change happens in the logic of the mentioned request
* types:
* Versions are taken from m0_cas_op::cg_txd. If no transaction
* descriptor is provided, version ID will be implicitly set
* to the current time (to allow for non-DTM0 operation).
* When this flag is set, the following change happens in the
* logic of the mentioned request types:
* - PUT does not overwrite "newest" (version-wise) records.
* Requirements:
* x COF_OVERWRITE is set.
* x Transaction descriptor has a valid DTX ID.
* x Transaction descriptor (if provided) has a valid DTX ID.
* - DEL puts a tombstone instead of an actual removal. In this
* mode, DEL does not return -ENOENT in the same way as
* PUT-with-COF_OVERWRITE does not return -EEXIST.
* Requirements:
* x Transaction descriptor has a valid DTX ID.
* x Transaction descriptor (if provided) has a valid DTX ID.
* - GET returns only "alive" entries (without tombstones).
* - NEXT also skips entries with tombstones.
*
Expand Down
8 changes: 4 additions & 4 deletions cas/ctg_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -2597,7 +2597,7 @@ int ctgdump(struct m0_motr *motr_ctx, char *fidstr, char *dump_in_hex_str)
static bool ctg_op_is_versioned(const struct m0_ctg_op *ctg_op)
{
const struct m0_cas_op *cas_op;
bool has_txd;
bool has_txd_ts;

/*
* Since the versioned behavior is optional, it may get turned off
Expand All @@ -2624,7 +2624,7 @@ static bool ctg_op_is_versioned(const struct m0_ctg_op *ctg_op)
if ((ctg_op->co_flags & COF_VERSIONED) == 0)
return M0_RC_INFO(false, "CAS request is not versioned.");

has_txd = !m0_dtm0_tx_desc_is_none(&cas_op->cg_txd);
has_txd_ts = !m0_dtm0_ts_is_none(&cas_op->cg_txd.dtd_id.dti_ts);

switch (CTG_OP_COMBINE(ctg_op->co_opcode, ctg_op->co_ct)) {
case CTG_OP_COMBINE(CO_PUT, CT_BTREE):
Expand All @@ -2633,11 +2633,11 @@ static bool ctg_op_is_versioned(const struct m0_ctg_op *ctg_op)
"PUT request without OVERWRITE is "
"not versioned.");
case CTG_OP_COMBINE(CO_DEL, CT_BTREE):
if (has_txd)
if (has_txd_ts)
return M0_RC(true);
else
return M0_RC_INFO(false,
"%s request is has an empty txd.",
"%s request has an empty txd timestamp.",
ctg_op->co_opcode == CO_PUT ?
"PUT" : "DEL");
case CTG_OP_COMBINE(CO_GET, CT_BTREE):
Expand Down
7 changes: 5 additions & 2 deletions cas/ut/client_ut.c
Original file line number Diff line number Diff line change
Expand Up @@ -2380,11 +2380,14 @@ static void put_overwrite_ver(void)
M0_UT_ASSERT(has_versions(&index, &keys,
version[V_FUTURE], COF_VERSIONED));

/* However, empty version disables the versioned behavior. */
/*
* However, empty version with COF_VERSIONED uses current timestamp.
* Check that version[V_NONE] got overwritten.
*/
put_get_verified(&index, &keys, &vals[V_PAST], &vals[V_PAST],
version[V_NONE], COF_VERSIONED | COF_OVERWRITE,
COF_VERSIONED);
M0_UT_ASSERT(has_versions(&index, &keys,
M0_UT_ASSERT(!has_versions(&index, &keys,
version[V_NONE], COF_VERSIONED));

rc = ut_idx_delete(&casc_ut_cctx, &ifid, 1, rep);
Expand Down
37 changes: 22 additions & 15 deletions dix/req.c
Original file line number Diff line number Diff line change
Expand Up @@ -1419,15 +1419,10 @@ static void dix_rop(struct m0_dix_req *req)
/** Checks if the given cas get reply has a newer version of the value */
static int dix_item_version_cmp(const struct m0_dix_item *ditem,
const struct m0_cas_get_reply *get_rep) {
/*
* TODO: once cas versions are propagated, check if the get reply
* has a newer version than seen previously. Will need to add
* version info to struct m0_dix_item. This function should return
* true if no previous value is set, or if the previous value has
* an older version. For now, always return true so the last
* reply in the array wins.
*/
return -1;
if (m0_crv_is_none(&get_rep->cge_ver) || m0_crv_is_none(&ditem->dxi_ver)) {
return -1;
}
return m0_crv_cmp(&ditem->dxi_ver, &get_rep->cge_ver);
}

static void dix_item_rc_update(struct m0_dix_req *req,
Expand All @@ -1446,9 +1441,11 @@ static void dix_item_rc_update(struct m0_dix_req *req,
case DIX_GET:
m0_cas_get_rep(creq, key_idx, &get_rep);
rc = get_rep.cge_rc;
if (rc == 0 && dix_item_version_cmp(ditem, &get_rep) < 0) {
if (M0_IN(rc, (0, -ENOENT)) &&
dix_item_version_cmp(ditem, &get_rep) < 0) {
m0_buf_free(&ditem->dxi_val);
ditem->dxi_val = get_rep.cge_val;
ditem->dxi_ver = get_rep.cge_ver;
/* Value will be freed at m0_dix_req_fini(). */
m0_cas_rep_mlock(creq, key_idx);
}
Expand Down Expand Up @@ -1638,7 +1635,7 @@ static void dix_cas_rop_rc_update(struct m0_dix_cas_rop *cas_rop, int rc)
for (i = 0; i < cas_rop->crp_keys_nr; i++) {
item_idx = cas_rop->crp_attrs[i].cra_item;
ditem = &req->dr_items[item_idx];
if (ditem->dxi_rc != 0)
if (!M0_IN(ditem->dxi_rc, (0, -ENOENT)))
continue;
if (rc == 0)
dix_item_rc_update(req, &cas_rop->crp_creq, i, ditem);
Expand Down Expand Up @@ -1777,6 +1774,8 @@ static int dix_cas_rops_send(struct m0_dix_req *req)
struct m0_reqh_service_ctx *cas_svc;
struct m0_dix_layout *layout = &req->dr_indices[0].dd_layout;
int rc;
uint32_t extra_flags;

M0_ENTRY("req=%p", req);

M0_PRE(rop->dg_cas_reqs_nr == 0);
Expand Down Expand Up @@ -1806,21 +1805,29 @@ static int dix_cas_rops_send(struct m0_dix_req *req)

switch (req->dr_type) {
case DIX_GET:
rc = m0_cas_get(creq, &cctg_id,
&cas_rop->crp_keys);
rc = (req->dr_is_meta ?
m0_cas_get : m0_cas_versioned_get)
(creq, &cctg_id,
&cas_rop->crp_keys);
break;
case DIX_PUT:
extra_flags = req->dr_is_meta ?
0 : COF_VERSIONED | COF_OVERWRITE;
rc = m0_cas_put(creq, &cctg_id,
&cas_rop->crp_keys,
&cas_rop->crp_vals,
req->dr_dtx,
cas_rop->crp_flags);
cas_rop->crp_flags |
extra_flags);
break;
case DIX_DEL:
extra_flags = req->dr_is_meta ?
0 : COF_VERSIONED;
rc = m0_cas_del(creq, &cctg_id,
&cas_rop->crp_keys,
req->dr_dtx,
cas_rop->crp_flags);
cas_rop->crp_flags |
extra_flags);
break;
case DIX_NEXT:
rc = m0_cas_next(creq, &cctg_id,
Expand Down
2 changes: 2 additions & 0 deletions dix/req_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "sm/sm.h" /* m0_sm_ast */
#include "pool/pool.h" /* m0_pool_nd_state */
#include "cas/client.h" /* m0_cas_req */
#include "cas/cas.h" /* m0_crv */

struct m0_dix_req;
struct m0_pool_version;
Expand All @@ -46,6 +47,7 @@ struct m0_dix_next_resultset;
struct m0_dix_item {
struct m0_buf dxi_key;
struct m0_buf dxi_val;
struct m0_crv dxi_ver;
/**
* Current parity group unit number for GET request. GET request is sent
* to target holding data unit at first. If this request fails, then
Expand Down
Loading