Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Phil/pruning paranoia #363

Merged
merged 3 commits into from
Feb 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 59 additions & 12 deletions cmd/gazctl/gazctlcmd/shards_prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
var metrics = shardsPruneMetrics{}
var logSegmentSets = make(map[pb.Journal][]recoverylog.Segment)
var skipRecoveryLogs = make(map[pb.Journal]bool)
// Retain the raw hints responses, so that we can log them if they're used to prune fragments
var rawHintsResponses = make(map[pb.Journal][]pc.GetHintsResponse)

for _, shard := range listShards(rsc, cmd.Selector).Shards {
metrics.shardsTotal++
Expand All @@ -59,6 +61,7 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
mbp.Must(err, "failed to fetch hints")

var recoveryLog = shard.Spec.RecoveryLog()
rawHintsResponses[recoveryLog] = append(rawHintsResponses[recoveryLog], *allHints)

for _, curHints := range append(allHints.BackupHints, allHints.PrimaryHints) {
var hints = curHints.Hints
Expand All @@ -82,7 +85,7 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
"shard": shard.Spec.Id,
"reason": reason,
"journal": recoveryLog,
}).Warn("will skip pruning recovery log journal")
}).Debug("will skip pruning recovery log journal")

break
}
Expand All @@ -91,35 +94,71 @@ func (cmd *cmdShardsPrune) Execute([]string) error {

for journal, segments := range logSegmentSets {
if skipRecoveryLogs[journal] {
log.WithField("journal", journal).Warn("skipping journal because another shard is missing hints that cover it")
log.WithField("journal", journal).Warn("skipping journal because a shard is missing hints that cover it")
continue
}
log.WithField("journal", journal).Debug("checking fragments of journal")
var prunedFragments []pb.Fragment
for _, f := range fetchFragments(ctx, rjc, journal) {
var spec = f.Spec

metrics.fragmentsTotal++
metrics.bytesTotal += spec.ContentLength()

if !overlapsAnySegment(segments, spec) {
if spec.ModTime == 0 {
// This shouldn't ever happen, as long as the label selector covers all shards that are using
// each journal. But we don't validate that up front, so failing fast is the next best thing.
log.WithFields(log.Fields{
"journal": spec.Journal,
"name": spec.ContentName(),
"begin": spec.Begin,
"end": spec.End,
"hintedSegments": segments,
}).Fatal("unpersisted fragment does not overlap any hinted segments (the label selector argument does not include all shards using this log)")
}
log.WithFields(log.Fields{
"log": spec.Journal,
"name": spec.ContentName(),
"size": spec.ContentLength(),
"mod": spec.ModTime,
}).Info("pruning fragment")

metrics.fragmentsPruned++
metrics.bytesPruned += spec.ContentLength()

"log": spec.Journal,
"name": spec.ContentName(),
"size": spec.ContentLength(),
"mod": spec.ModTime,
"begin": spec.Begin,
"end": spec.End,
}).Debug("pruning fragment")

var removed = true
if !cmd.DryRun {
mbp.Must(fragment.Remove(ctx, spec), "error removing fragment", "path", spec.ContentPath())
if err := fragment.Remove(ctx, spec); err != nil {
removed = false
metrics.failedToRemove++
log.WithFields(log.Fields{
"fragment": spec,
"error": err,
}).Warn("failed to remove fragment (skipping)")
}
}
if removed {
metrics.fragmentsPruned++
metrics.bytesPruned += spec.ContentLength()
prunedFragments = append(prunedFragments, spec)
}
}
}
if len(prunedFragments) > 0 {
log.WithFields(log.Fields{
"journal": journal,
"allHints": rawHintsResponses[journal],
"liveSegments": segments,
"prunedFragments": prunedFragments,
}).Info("pruned fragments")
}
logShardsPruneMetrics(metrics, journal.String(), "finished pruning log")
}
logShardsPruneMetrics(metrics, "", "finished pruning logs for all shards")

if metrics.failedToRemove > 0 {
log.WithField("failures", metrics.failedToRemove).Fatal("failed to remove fragments")
}
return nil
}

Expand All @@ -130,6 +169,12 @@ func overlapsAnySegment(segments []recoverylog.Segment, fragment pb.Fragment) bo
return true
}
}
if len(segments) == 0 {
log.WithFields(log.Fields{
"log": fragment.Journal,
}).Warn("no live segments for log")
return true
}
return false
}

Expand Down Expand Up @@ -170,6 +215,7 @@ type shardsPruneMetrics struct {
bytesTotal int64
bytesPruned int64
skippedJournals int64
failedToRemove int64
}

func logShardsPruneMetrics(m shardsPruneMetrics, journal, message string) {
Expand All @@ -182,6 +228,7 @@ func logShardsPruneMetrics(m shardsPruneMetrics, journal, message string) {
"bytesPruned": m.bytesPruned,
"bytesKept": m.bytesTotal - m.bytesPruned,
"skippedJournals": m.skippedJournals,
"failedToRemove": m.failedToRemove,
}
if journal != "" {
fields["journal"] = journal
Expand Down
Loading