diff --git a/cmd/gazctl/gazctlcmd/shards_prune.go b/cmd/gazctl/gazctlcmd/shards_prune.go index 20910640..beb0ba7c 100644 --- a/cmd/gazctl/gazctlcmd/shards_prune.go +++ b/cmd/gazctl/gazctlcmd/shards_prune.go @@ -49,6 +49,8 @@ func (cmd *cmdShardsPrune) Execute([]string) error { var metrics = shardsPruneMetrics{} var logSegmentSets = make(map[pb.Journal][]recoverylog.Segment) var skipRecoveryLogs = make(map[pb.Journal]bool) + // Retain the raw hints responses, so that we can log them if they're used to prune fragments + var rawHintsResponses = make(map[pb.Journal][]pc.GetHintsResponse) for _, shard := range listShards(rsc, cmd.Selector).Shards { metrics.shardsTotal++ @@ -59,6 +61,7 @@ func (cmd *cmdShardsPrune) Execute([]string) error { mbp.Must(err, "failed to fetch hints") var recoveryLog = shard.Spec.RecoveryLog() + rawHintsResponses[recoveryLog] = append(rawHintsResponses[recoveryLog], *allHints) for _, curHints := range append(allHints.BackupHints, allHints.PrimaryHints) { var hints = curHints.Hints @@ -82,7 +85,7 @@ func (cmd *cmdShardsPrune) Execute([]string) error { "shard": shard.Spec.Id, "reason": reason, "journal": recoveryLog, - }).Warn("will skip pruning recovery log journal") + }).Debug("will skip pruning recovery log journal") break } @@ -91,10 +94,11 @@ func (cmd *cmdShardsPrune) Execute([]string) error { for journal, segments := range logSegmentSets { if skipRecoveryLogs[journal] { - log.WithField("journal", journal).Warn("skipping journal because another shard is missing hints that cover it") + log.WithField("journal", journal).Warn("skipping journal because a shard is missing hints that cover it") continue } log.WithField("journal", journal).Debug("checking fragments of journal") + var prunedFragments []pb.Fragment for _, f := range fetchFragments(ctx, rjc, journal) { var spec = f.Spec @@ -102,24 +106,59 @@ func (cmd *cmdShardsPrune) Execute([]string) error { metrics.bytesTotal += spec.ContentLength() if !overlapsAnySegment(segments, spec) { + if spec.ModTime == 0 { + // This shouldn't ever happen, as long as the label selector covers all shards that are using + // each journal. But we don't validate that up front, so failing fast is the next best thing. + log.WithFields(log.Fields{ + "journal": spec.Journal, + "name": spec.ContentName(), + "begin": spec.Begin, + "end": spec.End, + "hintedSegments": segments, + }).Fatal("unpersisted fragment does not overlap any hinted segments (the label selector argument does not include all shards using this log)") + } log.WithFields(log.Fields{ - "log": spec.Journal, - "name": spec.ContentName(), - "size": spec.ContentLength(), - "mod": spec.ModTime, - }).Info("pruning fragment") - - metrics.fragmentsPruned++ - metrics.bytesPruned += spec.ContentLength() - + "log": spec.Journal, + "name": spec.ContentName(), + "size": spec.ContentLength(), + "mod": spec.ModTime, + "begin": spec.Begin, + "end": spec.End, + }).Debug("pruning fragment") + + var removed = true if !cmd.DryRun { - mbp.Must(fragment.Remove(ctx, spec), "error removing fragment", "path", spec.ContentPath()) + if err := fragment.Remove(ctx, spec); err != nil { + removed = false + metrics.failedToRemove++ + log.WithFields(log.Fields{ + "fragment": spec, + "error": err, + }).Warn("failed to remove fragment (skipping)") + } + } + if removed { + metrics.fragmentsPruned++ + metrics.bytesPruned += spec.ContentLength() + prunedFragments = append(prunedFragments, spec) } } } + if len(prunedFragments) > 0 { + log.WithFields(log.Fields{ + "journal": journal, + "allHints": rawHintsResponses[journal], + "liveSegments": segments, + "prunedFragments": prunedFragments, + }).Info("pruned fragments") + } logShardsPruneMetrics(metrics, journal.String(), "finished pruning log") } logShardsPruneMetrics(metrics, "", "finished pruning logs for all shards") + + if metrics.failedToRemove > 0 { + log.WithField("failures", metrics.failedToRemove).Fatal("failed to remove fragments") + } return nil } @@ -130,6 +169,12 @@ func overlapsAnySegment(segments []recoverylog.Segment, fragment pb.Fragment) bo return true } } + if len(segments) == 0 { + log.WithFields(log.Fields{ + "log": fragment.Journal, + }).Warn("no live segments for log") + return true + } return false } @@ -170,6 +215,7 @@ type shardsPruneMetrics struct { bytesTotal int64 bytesPruned int64 skippedJournals int64 + failedToRemove int64 } func logShardsPruneMetrics(m shardsPruneMetrics, journal, message string) { @@ -182,6 +228,7 @@ func logShardsPruneMetrics(m shardsPruneMetrics, journal, message string) { "bytesPruned": m.bytesPruned, "bytesKept": m.bytesTotal - m.bytesPruned, "skippedJournals": m.skippedJournals, + "failedToRemove": m.failedToRemove, } if journal != "" { fields["journal"] = journal