Skip to content

Commit

Permalink
Dynamic trailing computation
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 committed Oct 18, 2024
1 parent 76f1e7c commit 56abe80
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
38 changes: 34 additions & 4 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,28 @@ struct raft_transfer; /* Forward declaration */

struct raft_log;

/**
* Strategy to compute the amount of trailing entries to keep.
* This can be tuned with raft_set_snapshot_trailing_strategy
*/
enum {
/**
* The static strategy is to just keep a fixed amount of entries
* in the log cache. The number of entries can be tuned with
* @raft_set_snapshot_trailing. This is the default.
*/
RAFT_TRAILING_STRATEGY_STATIC,

/**
* The dynamic strategy computes the number of entries to keep by
* comparing the size held by each entry to the size of the snapshot.
* The idea behind this is that if the entries are too big, streaming
* the snapshot might be better. The number of entries is still limited
* by the value set by @raft_set_snapshot_trailing.
*/
RAFT_TRAILING_STRATEGY_DYNAMIC,
};

/**
* Hold and drive the state of a single raft server in a cluster.
* When replacing reserved fields in the middle of this struct, you MUST use a
Expand Down Expand Up @@ -964,7 +986,10 @@ struct raft
unsigned trailing; /* N. of trailing entries to retain */
struct raft_snapshot pending; /* In progress snapshot */
struct raft_io_snapshot_put put; /* Store snapshot request */
uint64_t reserved[8]; /* Future use */
uint8_t trailing_strategy;
/* Future use */
uint8_t reserved2[7];
uint64_t reserved[7];
} snapshot;

/*
Expand Down Expand Up @@ -1107,12 +1132,17 @@ RAFT_API void raft_set_snapshot_threshold(struct raft *r, unsigned n);
RAFT_API void raft_set_pre_vote(struct raft *r, bool enabled);

/**
* Number of outstanding log entries to keep in the log after a snapshot has
* been taken. This avoids sending snapshots when a follower is behind by just a
* few entries. The default is 128.
* Number of maximum outstanding log entries to keep in the log after a snapshot
* has been taken. This avoids sending snapshots when a follower is behind by just
* a few entries. The default is 2048.
*/
RAFT_API void raft_set_snapshot_trailing(struct raft *r, unsigned n);

/**
* Strategy to compute trailing amount. The default is RAFT_TRAILING_STRATEGY_STATIC.
*/
RAFT_API void raft_set_snapshot_trailing_strategy(struct raft *r, int strategy);

/**
* Set the maximum number of a catch-up rounds to try when replicating entries
* to a stand-by server that is being promoted to voter, before giving up and
Expand Down
15 changes: 15 additions & 0 deletions src/raft/raft.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "../raft.h"

#include <limits.h>
#include <stdint.h>
#include <string.h>

#include "../tracing.h"
Expand Down Expand Up @@ -94,6 +95,7 @@ int raft_init(struct raft *r,
r->snapshot.pending.term = 0;
r->snapshot.threshold = DEFAULT_SNAPSHOT_THRESHOLD;
r->snapshot.trailing = DEFAULT_SNAPSHOT_TRAILING;
r->snapshot.trailing_strategy = RAFT_TRAILING_STRATEGY_STATIC;
r->snapshot.put.data = NULL;
r->close_cb = NULL;
memset(r->errmsg, 0, sizeof r->errmsg);
Expand Down Expand Up @@ -172,6 +174,19 @@ void raft_set_snapshot_trailing(struct raft *r, unsigned n)
r->snapshot.trailing = n;
}

void raft_set_snapshot_trailing_strategy(struct raft *r, int strategy)

Check warning on line 177 in src/raft/raft.c

View check run for this annotation

Codecov / codecov/patch

src/raft/raft.c#L177

Added line #L177 was not covered by tests
{
switch (strategy) {
case RAFT_TRAILING_STRATEGY_STATIC:

Check warning on line 180 in src/raft/raft.c

View check run for this annotation

Codecov / codecov/patch

src/raft/raft.c#L180

Added line #L180 was not covered by tests
case RAFT_TRAILING_STRATEGY_DYNAMIC:
r->snapshot.trailing_strategy = (uint8_t)strategy;
break;
default:
r->snapshot.trailing_strategy = RAFT_TRAILING_STRATEGY_STATIC;
break;

Check warning on line 186 in src/raft/raft.c

View check run for this annotation

Codecov / codecov/patch

src/raft/raft.c#L182-L186

Added lines #L182 - L186 were not covered by tests
}
}

Check warning on line 188 in src/raft/raft.c

View check run for this annotation

Codecov / codecov/patch

src/raft/raft.c#L188

Added line #L188 was not covered by tests

void raft_set_max_catch_up_rounds(struct raft *r, unsigned n)
{
r->max_catch_up_rounds = n;
Expand Down
44 changes: 43 additions & 1 deletion src/raft/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "configuration.h"
#include "convert.h"
#include "entry.h"
#include "src/raft.h"
#ifdef __GLIBC__
#include "error.h"
#endif
Expand Down Expand Up @@ -1674,6 +1675,47 @@ static void takeSnapshotClose(struct raft *r, struct raft_snapshot *s)
r->fsm->snapshot_finalize(r->fsm, &s->bufs, &s->n_bufs);
}


static unsigned dynamicTrailingIndex(struct raft *r, struct raft_snapshot *snapshot) {
struct raft_log *l = r->log;
unsigned threshold = (unsigned)(l->front - l->back);
size_t size = 0;
unsigned i;

Check warning on line 1683 in src/raft/replication.c

View check run for this annotation

Codecov / codecov/patch

src/raft/replication.c#L1679-L1683

Added lines #L1679 - L1683 were not covered by tests

/**
* This should never happen as it would mean that the snapshot
* contains no new entry (i.e. either the snapshot is empty
* or equal to the previous one).
*/
assert(threshold > 0);
threshold = min(r->snapshot.threshold, threshold);

Check warning on line 1691 in src/raft/replication.c

View check run for this annotation

Codecov / codecov/patch

src/raft/replication.c#L1690-L1691

Added lines #L1690 - L1691 were not covered by tests

for (i = 0; i < snapshot->n_bufs; i++) {
size += snapshot->bufs[i].len;

Check warning on line 1694 in src/raft/replication.c

View check run for this annotation

Codecov / codecov/patch

src/raft/replication.c#L1694

Added line #L1694 was not covered by tests
}

for (i = 1; i <= threshold; i++) {
struct raft_entry* entry = &(l->entries[l->back - i]);

Check warning on line 1698 in src/raft/replication.c

View check run for this annotation

Codecov / codecov/patch

src/raft/replication.c#L1698

Added line #L1698 was not covered by tests
if (entry->buf.len > size) {
return i;

Check warning on line 1700 in src/raft/replication.c

View check run for this annotation

Codecov / codecov/patch

src/raft/replication.c#L1700

Added line #L1700 was not covered by tests
}
size -= entry->buf.len;

Check warning on line 1702 in src/raft/replication.c

View check run for this annotation

Codecov / codecov/patch

src/raft/replication.c#L1702

Added line #L1702 was not covered by tests
}

return threshold;
}


static unsigned trailingSize(struct raft *r, struct raft_snapshot *snapshot) {
switch (r->snapshot.trailing_strategy) {
case RAFT_TRAILING_STRATEGY_DYNAMIC:
return dynamicTrailingIndex(r, snapshot);

Check warning on line 1712 in src/raft/replication.c

View check run for this annotation

Codecov / codecov/patch

src/raft/replication.c#L1711-L1712

Added lines #L1711 - L1712 were not covered by tests
case RAFT_TRAILING_STRATEGY_STATIC:
default:
return r->snapshot.trailing;
}
}

static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status)
{
struct raft *r = req->data;
Expand Down Expand Up @@ -1702,7 +1744,7 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status)
* an aborted configuration change. */
tracef("failed to backup last committed configuration.");
}
logSnapshot(r->log, snapshot->index, r->snapshot.trailing);
logSnapshot(r->log, snapshot->index, trailingSize(r, snapshot));
out:
takeSnapshotClose(r, snapshot);
r->snapshot.pending.term = 0;
Expand Down

0 comments on commit 56abe80

Please # to comment.