-
Notifications
You must be signed in to change notification settings - Fork 334
/
Copy pathdeleter.go
394 lines (346 loc) · 13.2 KB
/
deleter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deleter
import (
"bufio"
"fmt"
"os"
"os/exec"
"strings"
"sync"
"time"
"k8s.io/klog/v2"
"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common"
"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/metrics"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
)
// CleanupState indicates the state of the cleanup process.
type CleanupState int
const (
// CSUnknown State of the cleanup is unknown.
CSUnknown CleanupState = iota + 1
// CSNotFound No cleanup process was found.
CSNotFound
// CSRunning Cleanup process is still running.
CSRunning
// CSFailed Cleanup process has ended in failure.
CSFailed
// CSSucceeded Cleanup process has ended successfully.
CSSucceeded
)
// Deleter handles PV cleanup and object deletion
// For file-based volumes, it deletes the contents of the directory
type Deleter struct {
*common.RuntimeConfig
CleanupStatus *CleanupStatusTracker
}
// NewDeleter creates a Deleter object to handle the cleanup and deletion of local PVs
// allocated by this provisioner
func NewDeleter(config *common.RuntimeConfig, cleanupTracker *CleanupStatusTracker) *Deleter {
return &Deleter{
RuntimeConfig: config,
CleanupStatus: cleanupTracker,
}
}
// DeletePVs will scan through all the existing PVs that are released, and cleanup and
// delete them
func (d *Deleter) DeletePVs() {
for _, pv := range d.Cache.ListPVs() {
if pv.Status.Phase != v1.VolumeReleased {
continue
}
// Do not clean PV's that were already deleted,
// they may disappear at any time.
if !pv.DeletionTimestamp.IsZero() {
continue
}
name := pv.Name
switch pv.Spec.PersistentVolumeReclaimPolicy {
case v1.PersistentVolumeReclaimRetain:
klog.V(4).Infof("reclaimVolume[%s]: policy is Retain, nothing to do", name)
case v1.PersistentVolumeReclaimRecycle:
klog.V(4).Infof("reclaimVolume[%s]: policy is Recycle which is not supported", name)
d.RuntimeConfig.Recorder.Eventf(pv, v1.EventTypeWarning, "VolumeUnsupportedReclaimPolicy", "Volume has unsupported PersistentVolumeReclaimPolicy: Recycle")
case v1.PersistentVolumeReclaimDelete:
klog.V(4).Infof("reclaimVolume[%s]: policy is Delete", name)
// Cleanup volume
err := d.deletePV(pv)
if err != nil {
mode, modeErr := d.getVolMode(pv)
if modeErr != nil {
mode = "unknown"
}
deleteType := metrics.DeleteTypeProcess
if d.shouldRunJob(mode) {
deleteType = metrics.DeleteTypeJob
}
metrics.PersistentVolumeDeleteFailedTotal.WithLabelValues(string(mode), deleteType).Inc()
cleaningLocalPVErr := fmt.Errorf("Error cleaning PV %q: %v", name, err.Error())
d.RuntimeConfig.Recorder.Eventf(pv, v1.EventTypeWarning, common.EventVolumeFailedDelete, cleaningLocalPVErr.Error())
klog.Error(err)
continue
}
default:
// Unknown PersistentVolumeReclaimPolicy
d.RuntimeConfig.Recorder.Eventf(pv, v1.EventTypeWarning, "VolumeUnknownReclaimPolicy", "Volume has unrecognized PersistentVolumeReclaimPolicy")
}
}
}
func (d *Deleter) getVolMode(pv *v1.PersistentVolume) (v1.PersistentVolumeMode, error) {
config, ok := d.DiscoveryMap[pv.Spec.StorageClassName]
if !ok {
return "", fmt.Errorf("Unknown storage class name %s", pv.Spec.StorageClassName)
}
mountPath, err := common.GetContainerPath(pv, config)
if err != nil {
return "", err
}
volMode, err := common.GetVolumeMode(d.VolUtil, mountPath)
if err != nil {
return "", err
}
return volMode, nil
}
func (d *Deleter) shouldRunJob(mode v1.PersistentVolumeMode) bool {
return mode == v1.PersistentVolumeBlock && d.RuntimeConfig.UseJobForCleaning
}
func (d *Deleter) deletePV(pv *v1.PersistentVolume) error {
if pv.Spec.Local == nil {
return fmt.Errorf("Unsupported volume type")
}
config, ok := d.DiscoveryMap[pv.Spec.StorageClassName]
if !ok {
return fmt.Errorf("Unknown storage class name %s", pv.Spec.StorageClassName)
}
mountPath, err := common.GetContainerPath(pv, config)
if err != nil {
return err
}
volMode, err := d.getVolMode(pv)
if err != nil {
return fmt.Errorf("failed to get volume mode of path %q: %v", mountPath, err)
}
runjob := d.shouldRunJob(volMode)
// Exit if cleaning is still in progress.
if d.CleanupStatus.InProgress(pv.Name, runjob) {
return nil
}
// Check if cleaning was just completed.
state, startTime, err := d.CleanupStatus.RemoveStatus(pv.Name, runjob)
if err != nil {
return err
}
switch state {
case CSSucceeded:
// Found a completed cleaning entry
klog.Infof("Deleting pv %s after successful cleanup", pv.Name)
if err = d.APIUtil.DeletePV(pv.Name); err != nil {
if !errors.IsNotFound(err) {
d.RuntimeConfig.Recorder.Eventf(pv, v1.EventTypeWarning, common.EventVolumeFailedDelete,
err.Error())
return fmt.Errorf("Error deleting PV %q: %v", pv.Name, err.Error())
}
}
mode := string(volMode)
deleteType := metrics.DeleteTypeProcess
if runjob {
deleteType = metrics.DeleteTypeJob
}
metrics.PersistentVolumeDeleteTotal.WithLabelValues(mode, deleteType).Inc()
if startTime != nil {
var capacityBytes int64
if capacity, ok := pv.Spec.Capacity[v1.ResourceStorage]; ok {
capacityBytes = capacity.Value()
}
capacityBreakDown := metrics.CapacityBreakDown(capacityBytes)
cleanupCommand := ""
if len(config.BlockCleanerCommand) > 0 {
cleanupCommand = config.BlockCleanerCommand[0]
}
metrics.PersistentVolumeDeleteDurationSeconds.WithLabelValues(mode, deleteType, capacityBreakDown, cleanupCommand).Observe(time.Since(*startTime).Seconds())
}
return nil
case CSFailed:
klog.Infof("Cleanup for pv %s failed. Restarting cleanup", pv.Name)
case CSNotFound:
klog.Infof("Start cleanup for pv %s", pv.Name)
default:
return fmt.Errorf("Unexpected state %d for pv %s", state, pv.Name)
}
if volMode == v1.PersistentVolumeBlock {
if len(config.BlockCleanerCommand) < 1 {
return fmt.Errorf("Blockcleaner command was empty for pv %q mountPath %s but mount dir is %s", pv.Name,
mountPath, config.MountDir)
}
}
if runjob {
// If we are dealing with block volumes and using jobs based cleaning for it.
return d.runJob(pv, volMode, mountPath, config)
}
return d.runProcess(pv, volMode, mountPath, config)
}
func (d *Deleter) runProcess(pv *v1.PersistentVolume, volMode v1.PersistentVolumeMode, mountPath string,
config common.MountConfig) error {
// Run as exec script.
err := d.CleanupStatus.ProcTable.MarkRunning(pv.Name)
if err != nil {
return err
}
go d.asyncCleanPV(pv, volMode, mountPath, config)
return nil
}
func (d *Deleter) asyncCleanPV(pv *v1.PersistentVolume, volMode v1.PersistentVolumeMode, mountPath string,
config common.MountConfig) {
err := d.cleanPV(pv, volMode, mountPath, config)
if err != nil {
klog.Error(err)
// Set process as failed.
if err := d.CleanupStatus.ProcTable.MarkFailed(pv.Name); err != nil {
klog.Error(err)
}
return
}
// Set process as succeeded.
if err := d.CleanupStatus.ProcTable.MarkSucceeded(pv.Name); err != nil {
klog.Error(err)
}
}
func (d *Deleter) cleanPV(pv *v1.PersistentVolume, volMode v1.PersistentVolumeMode, mountPath string,
config common.MountConfig) error {
// Make absolutely sure here that we are not deleting anything outside of mounted dir
if !strings.HasPrefix(mountPath, config.MountDir) {
return fmt.Errorf("Unexpected error pv %q mountPath %s but mount dir is %s", pv.Name, mountPath,
config.MountDir)
}
var err error
switch volMode {
case v1.PersistentVolumeFilesystem:
err = d.cleanFilePV(pv, mountPath, config)
case v1.PersistentVolumeBlock:
err = d.cleanBlockPV(pv, mountPath, config)
default:
err = fmt.Errorf("Unexpected volume mode %q for deleting path %q", volMode, pv.Spec.Local.Path)
}
return err
}
func (d *Deleter) cleanFilePV(pv *v1.PersistentVolume, mountPath string, config common.MountConfig) error {
klog.Infof("Deleting PV file volume %q contents at hostpath %q, mountpath %q", pv.Name, pv.Spec.Local.Path,
mountPath)
hostPath := pv.Spec.Local.Path
return d.VolUtil.DeleteContents(hostPath, mountPath)
}
func (d *Deleter) cleanBlockPV(pv *v1.PersistentVolume, blkdevPath string, config common.MountConfig) error {
cleaningInfo := fmt.Errorf("Starting cleanup of Block PV %q, this may take a while", pv.Name)
d.RuntimeConfig.Recorder.Eventf(pv, v1.EventTypeNormal, common.VolumeDelete, cleaningInfo.Error())
klog.Infof("Deleting PV block volume %q device hostpath %q, mountpath %q", pv.Name, pv.Spec.Local.Path,
blkdevPath)
err := d.execScript(pv.Name, blkdevPath, config.BlockCleanerCommand[0], config.BlockCleanerCommand[1:]...)
if err != nil {
klog.Error(err)
return err
}
klog.Infof("Completed cleanup of pv %q", pv.Name)
return nil
}
func (d *Deleter) execScript(pvName string, blkdevPath string, exe string, exeArgs ...string) error {
cmd := exec.Command(exe, exeArgs...)
cmd.Env = append(os.Environ(), fmt.Sprintf("%s=%s", common.LocalPVEnv, blkdevPath))
var wg sync.WaitGroup
// Wait for stderr & stdout go routines
wg.Add(2)
outReader, err := cmd.StdoutPipe()
if err != nil {
return err
}
go func() {
defer wg.Done()
outScanner := bufio.NewScanner(outReader)
for outScanner.Scan() {
outstr := outScanner.Text()
klog.Infof("Cleanup pv %q: StdoutBuf - %q", pvName, outstr)
}
}()
errReader, err := cmd.StderrPipe()
if err != nil {
return err
}
go func() {
defer wg.Done()
errScanner := bufio.NewScanner(errReader)
for errScanner.Scan() {
errstr := errScanner.Text()
klog.Infof("Cleanup pv %q: StderrBuf - %q", pvName, errstr)
}
}()
err = cmd.Start()
if err != nil {
return err
}
wg.Wait()
err = cmd.Wait()
if err != nil {
return err
}
return nil
}
// runJob runs a cleaning job.
// The advantages of using a Job to do block cleaning (which is a process that can take several hours) is as follows
// 1. By naming the job based on the specific name of the volume, one ensures that only one instance of a cleaning
// job will be active for any given volume. Any attempt to create another will fail due to name collision. This
// avoids any concurrent cleaning problems.
// 2. The above approach also ensures that we don't accidentally create a new PV when a cleaning job is in progress.
// Even if a user accidentally deletes the PV, the presence of the cleaning job would prevent the provisioner from
// attempting to re-create it. This would be the case even if the Daemonset had two provisioners running on the same
// host (which can sometimes happen as the Daemonset controller follows "at least one" semantics).
// 3. Admins get transparency on what is going on with a released volume by just running kubectl commands
// to check for any corresponding cleaning job for a given volume and looking into its progress or failure.
//
// To achieve these advantages, the provisioner names the cleaning job with a constant name based on the PV name.
// If a job completes successfully, then the job is first deleted and then the cleaned PV (to enable its rediscovery).
// A failed Job is left "as is" (after a few retries to execute) for admins to intervene/debug and resolve. This is the
// safest thing to do in this scenario as it is even in a non-Job based approach. Please note that for successful jobs,
// deleting it does delete the logs of the job run. This is probably an acceptable initial implementation as the logs
// of successful run are not as interesting. Long term, we might want to fetch the logs of the successful Jobs too,
// before deleting them, but for the initial implementation we will keep things simple and perhaps decide the
// enhancement based on user feedback.
func (d *Deleter) runJob(pv *v1.PersistentVolume, volMode v1.PersistentVolumeMode, mountPath string, config common.MountConfig) error {
if d.JobContainerImage == "" {
return fmt.Errorf("cannot run cleanup job without specifying job image name in the environment variable")
}
job, err := NewCleanupJob(pv, volMode, d.JobContainerImage, d.JobTolerations, d.Node.Name, d.Namespace, mountPath, config)
if err != nil {
return err
}
return d.RuntimeConfig.APIUtil.CreateJob(job)
}
// CleanupStatusTracker tracks cleanup processes that are either process based or jobs based.
type CleanupStatusTracker struct {
ProcTable ProcTable
JobController JobController
}
// InProgress returns true if the cleaning for the specified PV is in progress.
func (c *CleanupStatusTracker) InProgress(pvName string, isJob bool) bool {
if isJob {
return c.JobController.IsCleaningJobRunning(pvName)
}
return c.ProcTable.IsRunning(pvName)
}
// RemoveStatus removes and returns the status and start time of a completed cleaning process.
// The method returns an error if the process has not yet completed.
func (c *CleanupStatusTracker) RemoveStatus(pvName string, isJob bool) (CleanupState, *time.Time, error) {
if isJob {
return c.JobController.RemoveJob(pvName)
}
return c.ProcTable.RemoveEntry(pvName)
}