-
Notifications
You must be signed in to change notification settings - Fork 182
/
cnsvolumeoperationrequest.go
470 lines (428 loc) · 19 KB
/
cnsvolumeoperationrequest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cnsvolumeoperationrequest
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/davecgh/go-spew/spew"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
csiconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger"
csitypes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/types"
cnsvolumeoperationrequestconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/internalapis/cnsvolumeoperationrequest/config"
cnsvolumeoprequestv1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/internalapis/cnsvolumeoperationrequest/v1alpha1"
k8s "sigs.k8s.io/vsphere-csi-driver/v3/pkg/kubernetes"
)
const (
// CRDSingular represents the singular name of cnsvolumeoperationrequest CRD.
CRDSingular = "cnsvolumeoperationrequest"
// CRDPlural represents the plural name of cnsvolumeoperationrequest CRD.
CRDPlural = "cnsvolumeoperationrequests"
// EnvCSINamespace represents the environment variable which
// stores the namespace in which the CSI driver is running.
EnvCSINamespace = "CSI_NAMESPACE"
)
// VolumeOperationRequest is an interface that supports handling idempotency
// in CSI volume manager. This interface persists operation details invoked
// on CNS and returns the persisted information to callers whenever it is requested.
type VolumeOperationRequest interface {
// GetRequestDetails returns the details of the operation on the volume
// that is persisted by the VolumeOperationRequest interface.
// Returns an error if any error is encountered while attempting to
// read the previously persisted information.
GetRequestDetails(ctx context.Context, name string) (*VolumeOperationRequestDetails, error)
// StoreRequestDetails persists the details of the operation taking
// place on the volume.
// Returns an error if any error is encountered. Clients must assume
// that the attempt to persist the information failed if an error is returned.
StoreRequestDetails(ctx context.Context, instance *VolumeOperationRequestDetails) error
// DeleteRequestDetails deletes the details of the operation on the volume
// that was persisted by the VolumeOperationRequest interface.
DeleteRequestDetails(ctx context.Context, name string) error
}
// operationRequestStore implements the VolumeOperationsRequest interface.
// This implementation persists the operation information on etcd via a client
// to the API server. Reads are also done directly on etcd; there is no caching
// layer involved.
type operationRequestStore struct {
k8sclient client.Client
}
var (
csiNamespace string
operationRequestStoreInstance *operationRequestStore
operationStoreInitLock = &sync.Mutex{}
isPodVMOnStretchSupervisorFSSEnabled bool
)
// InitVolumeOperationRequestInterface creates the CnsVolumeOperationRequest
// definition on the API server and returns an implementation of
// VolumeOperationRequest interface. Clients are unaware of the implementation
// details to read and persist volume operation details.
func InitVolumeOperationRequestInterface(ctx context.Context, cleanupInterval int,
isBlockVolumeSnapshotEnabled func() bool, isPodVMOnStretchSupervisorEnabled bool) (
VolumeOperationRequest, error) {
log := logger.GetLogger(ctx)
csiNamespace = getCSINamespace()
operationStoreInitLock.Lock()
defer operationStoreInitLock.Unlock()
if operationRequestStoreInstance == nil {
// Create CnsVolumeOperationRequest definition on API server.
log.Info(
"Creating CnsVolumeOperationRequest definition on API server and initializing VolumeOperationRequest instance",
)
err := k8s.CreateCustomResourceDefinitionFromManifest(ctx,
cnsvolumeoperationrequestconfig.EmbedCnsVolumeOperationRequestFile,
cnsvolumeoperationrequestconfig.EmbedCnsVolumeOperationRequestFileName)
if err != nil {
log.Errorf("failed to create CnsVolumeOperationRequest CRD with error: %v", err)
return nil, err
}
// Get in cluster config for client to API server.
config, err := k8s.GetKubeConfig(ctx)
if err != nil {
log.Errorf("failed to get kubeconfig with error: %v", err)
return nil, err
}
// Create client to API server.
k8sclient, err := k8s.NewClientForGroup(ctx, config, cnsvolumeoprequestv1alpha1.SchemeGroupVersion.Group)
if err != nil {
log.Errorf("failed to create k8sClient with error: %v", err)
return nil, err
}
// Initialize the operationRequestStoreOnETCD implementation of
// VolumeOperationRequest interface.
// NOTE: Currently there is only a single implementation of this
// interface. Future implementations will need modify this step.
operationRequestStoreInstance = &operationRequestStore{
k8sclient: k8sclient,
}
go operationRequestStoreInstance.cleanupStaleInstances(cleanupInterval, isBlockVolumeSnapshotEnabled)
}
// Store PodVMOnStretchedSupervisor FSS value for later use.
isPodVMOnStretchSupervisorFSSEnabled = isPodVMOnStretchSupervisorEnabled
return operationRequestStoreInstance, nil
}
// GetRequestDetails returns the details of the operation on the volume
// that is persisted by the VolumeOperationRequest interface, by querying
// API server for a CnsVolumeOperationRequest instance with the given
// name.
// Returns an error if any error is encountered while attempting to
// read the previously persisted information from the API server.
// Callers need to differentiate NotFound errors if required.
func (or *operationRequestStore) GetRequestDetails(
ctx context.Context,
name string,
) (*VolumeOperationRequestDetails, error) {
log := logger.GetLogger(ctx)
instanceKey := client.ObjectKey{Name: name, Namespace: csiNamespace}
log.Debugf("Getting CnsVolumeOperationRequest instance with name %s/%s", instanceKey.Namespace, instanceKey.Name)
instance := &cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequest{}
err := or.k8sclient.Get(ctx, instanceKey, instance)
if err != nil {
return nil, err
}
log.Debugf("Found CnsVolumeOperationRequest instance %v", spew.Sdump(instance))
if len(instance.Status.LatestOperationDetails) == 0 {
return nil, fmt.Errorf("length of LatestOperationDetails expected to be greater than 1 if the instance exists")
}
// Callers only need to know about the last operation that was invoked on a volume.
operationDetailsToReturn := instance.Status.LatestOperationDetails[len(instance.Status.LatestOperationDetails)-1]
var quotaDetails *QuotaDetails
if isPodVMOnStretchSupervisorFSSEnabled && instance.Status.StorageQuotaDetails != nil {
quotaDetails = &QuotaDetails{
Reserved: instance.Status.StorageQuotaDetails.Reserved,
StorageClassName: instance.Status.StorageQuotaDetails.StorageClassName,
StoragePolicyId: instance.Status.StorageQuotaDetails.StoragePolicyId,
Namespace: instance.Status.StorageQuotaDetails.Namespace,
}
}
return CreateVolumeOperationRequestDetails(instance.Spec.Name, instance.Status.VolumeID, instance.Status.SnapshotID,
instance.Status.Capacity, quotaDetails, operationDetailsToReturn.TaskInvocationTimestamp,
operationDetailsToReturn.TaskID, operationDetailsToReturn.VCenterServer, operationDetailsToReturn.OpID,
operationDetailsToReturn.TaskStatus, operationDetailsToReturn.Error),
nil
}
// StoreRequestDetails persists the details of the operation taking
// place on the volume by storing it on the API server.
// Returns an error if any error is encountered. Clients must assume
// that the attempt to persist the information failed if an error is returned.
func (or *operationRequestStore) StoreRequestDetails(
ctx context.Context,
operationToStore *VolumeOperationRequestDetails,
) error {
log := logger.GetLogger(ctx)
if operationToStore == nil {
return logger.LogNewError(log, "cannot store empty operation")
}
log.Debugf("Storing CnsVolumeOperationRequest instance with spec %v", spew.Sdump(operationToStore))
operationDetailsToStore := convertToCnsVolumeOperationRequestDetails(*operationToStore.OperationDetails)
instance := &cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequest{}
instanceKey := client.ObjectKey{Name: operationToStore.Name, Namespace: csiNamespace}
if err := or.k8sclient.Get(ctx, instanceKey, instance); err != nil {
if apierrors.IsNotFound(err) {
// Create new instance on API server if it doesnt exist.
// Implies that this is the first time this object is
// being stored.
newInstance := &cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequest{
ObjectMeta: metav1.ObjectMeta{
Name: instanceKey.Name,
Namespace: instanceKey.Namespace,
},
Spec: cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequestSpec{
Name: instanceKey.Name,
},
Status: cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequestStatus{
VolumeID: operationToStore.VolumeID,
SnapshotID: operationToStore.SnapshotID,
Capacity: operationToStore.Capacity,
FirstOperationDetails: *operationDetailsToStore,
LatestOperationDetails: []cnsvolumeoprequestv1alpha1.OperationDetails{
*operationDetailsToStore,
},
},
}
if isPodVMOnStretchSupervisorFSSEnabled && operationToStore.QuotaDetails != nil {
newInstance.Status.StorageQuotaDetails = &cnsvolumeoprequestv1alpha1.QuotaDetails{
Reserved: operationToStore.QuotaDetails.Reserved,
StoragePolicyId: operationToStore.QuotaDetails.StoragePolicyId,
StorageClassName: operationToStore.QuotaDetails.StorageClassName,
Namespace: operationToStore.QuotaDetails.Namespace,
AggregatedSnapshotSize: operationToStore.QuotaDetails.AggregatedSnapshotSize,
SnapshotLatestOperationCompleteTime: operationToStore.QuotaDetails.SnapshotLatestOperationCompleteTime,
}
}
err = or.k8sclient.Create(ctx, newInstance)
if err != nil {
log.Errorf(
"failed to create CnsVolumeOperationRequest instance %s/%s with error: %v",
instanceKey.Namespace,
instanceKey.Name,
err,
)
return err
}
log.Debugf(
"Created CnsVolumeOperationRequest instance %s/%s with latest information for task with ID: %s",
instanceKey.Namespace,
instanceKey.Name,
operationDetailsToStore.TaskID,
)
return nil
}
log.Errorf(
"failed to get CnsVolumeOperationRequest instance %s/%s with error: %v",
instanceKey.Namespace,
instanceKey.Name,
err,
)
return err
}
// Create a deep copy since we modify the object.
updatedInstance := instance.DeepCopy()
// Modify VolumeID, SnapshotID and Capacity
updatedInstance.Status.VolumeID = operationToStore.VolumeID
updatedInstance.Status.SnapshotID = operationToStore.SnapshotID
updatedInstance.Status.Capacity = operationToStore.Capacity
if isPodVMOnStretchSupervisorFSSEnabled && operationToStore.QuotaDetails != nil {
updatedInstance.Status.StorageQuotaDetails = &cnsvolumeoprequestv1alpha1.QuotaDetails{
Reserved: operationToStore.QuotaDetails.Reserved,
StoragePolicyId: operationToStore.QuotaDetails.StoragePolicyId,
StorageClassName: operationToStore.QuotaDetails.StorageClassName,
Namespace: operationToStore.QuotaDetails.Namespace,
AggregatedSnapshotSize: operationToStore.QuotaDetails.AggregatedSnapshotSize,
SnapshotLatestOperationCompleteTime: operationToStore.QuotaDetails.SnapshotLatestOperationCompleteTime,
}
}
// Modify FirstOperationDetails only if TaskID's match or the initial TaskID is empty.
firstOp := instance.Status.FirstOperationDetails
if firstOp.TaskStatus == TaskInvocationStatusInProgress &&
(firstOp.TaskID == operationToStore.OperationDetails.TaskID || firstOp.TaskID == "") {
updatedInstance.Status.FirstOperationDetails = *operationDetailsToStore
}
operationExistsInList := false
// If the task details already exist in the status, update it with the
// latest information.
for index := len(instance.Status.LatestOperationDetails) - 1; index >= 0; index-- {
operationDetail := instance.Status.LatestOperationDetails[index]
if operationDetail.TaskStatus == TaskInvocationStatusInProgress &&
(operationDetailsToStore.TaskID == operationDetail.TaskID || operationDetail.TaskID == "") {
updatedInstance.Status.LatestOperationDetails[index] = *operationDetailsToStore
operationExistsInList = true
break
}
}
if !operationExistsInList {
// Append the latest task details to the local instance and
// ensure length of LatestOperationDetails is not greater
// than 10.
updatedInstance.Status.LatestOperationDetails = append(
updatedInstance.Status.LatestOperationDetails,
*operationDetailsToStore,
)
if len(updatedInstance.Status.LatestOperationDetails) > maxEntriesInLatestOperationDetails {
updatedInstance.Status.LatestOperationDetails = updatedInstance.Status.LatestOperationDetails[1:]
}
}
// Store the local instance on the API server.
err := or.k8sclient.Update(ctx, updatedInstance)
if err != nil {
log.Errorf(
"failed to update CnsVolumeOperationRequest instance %s/%s with error: %v",
instanceKey.Namespace,
instanceKey.Name,
err,
)
return err
}
log.Debugf(
"Updated CnsVolumeOperationRequest instance %s/%s with latest information for task with ID: %s",
instanceKey.Namespace,
instanceKey.Name,
operationDetailsToStore.TaskID,
)
return nil
}
// DeleteRequestDetails deletes the input CnsVolumeOperationRequest instance
// from the operationRequestStore.
func (or *operationRequestStore) DeleteRequestDetails(ctx context.Context, name string) error {
log := logger.GetLogger(ctx)
log.Debugf("Deleting CnsVolumeOperationRequest instance with name %s/%s",
csiNamespace, name)
err := or.k8sclient.Delete(ctx, &cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequest{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: csiNamespace,
},
})
if err != nil {
if !apierrors.IsNotFound(err) {
log.Errorf("failed to delete CnsVolumeOperationRequest instance %s/%s with error: %v",
csiNamespace, name, err)
return err
}
}
return nil
}
// cleanupStaleInstances cleans up CnsVolumeOperationRequest instances for
// volumes that are no longer present in the kubernetes cluster.
func (or *operationRequestStore) cleanupStaleInstances(cleanupInterval int, isBlockVolumeSnapshotEnabled func() bool) {
ticker := time.NewTicker(time.Duration(cleanupInterval) * time.Minute)
ctx, log := logger.GetNewContextWithLogger()
log.Infof("CnsVolumeOperationRequest clean up interval is set to %d minutes", cleanupInterval)
for ; true; <-ticker.C {
log.Infof("Cleaning up stale CnsVolumeOperationRequest instances.")
instanceMap := make(map[string]bool)
cnsVolumeOperationRequestList := &cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequestList{}
err := or.k8sclient.List(ctx, cnsVolumeOperationRequestList)
if err != nil {
log.Errorf("failed to list CnsVolumeOperationRequests with error %v. Abandoning "+
"CnsVolumeOperationRequests clean up ...", err)
continue
}
k8sclient, err := k8s.NewClient(ctx)
if err != nil {
log.Errorf("failed to get k8sclient with error: %v. Abandoning CnsVolumeOperationRequests "+
"clean up ...", err)
continue
}
pvList, err := k8sclient.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{})
if err != nil {
log.Errorf("failed to list PersistentVolumes with error %v. Abandoning "+
"CnsVolumeOperationRequests clean up ...", err)
continue
}
for _, pv := range pvList.Items {
if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == csitypes.Name {
instanceMap[pv.Name] = true
volumeHandle := pv.Spec.CSI.VolumeHandle
if strings.Contains(volumeHandle, "file") {
volumeHandle = strings.ReplaceAll(volumeHandle, ":", "-")
}
instanceMap[volumeHandle] = true
}
}
blockVolumeSnapshotEnabled := isBlockVolumeSnapshotEnabled()
// skip cleaning up of snapshot related CnsVolumeOperationRequests if FSS is not enabled.
if blockVolumeSnapshotEnabled {
snapshotterClient, err := k8s.NewSnapshotterClient(ctx)
if err != nil {
log.Errorf("failed to get snapshotterClient with error: %v. Abandoning "+
"CnsVolumeOperationRequests clean up ...", err)
return
}
// the List API below ensures VolumeSnapshotContent CRD is installed and lists the existing
// VolumeSnapshotContent CRs in cluster.
vscList, err := snapshotterClient.SnapshotV1().VolumeSnapshotContents().List(ctx, metav1.ListOptions{})
if err != nil {
log.Errorf("failed to list VolumeSnapshotContents with error %v. Abandoning "+
"CnsVolumeOperationRequests clean up ...", err)
return
}
for _, vsc := range vscList.Items {
if vsc.Spec.Driver != csitypes.Name {
continue
}
volumeHandle := vsc.Spec.Source.VolumeHandle
if volumeHandle != nil {
// CnsVolumeOperation instance for CreateSnapshot
instanceMap[strings.TrimPrefix(vsc.Name, "snapcontent-")+"-"+*volumeHandle] = true
}
if vsc.Status != nil && vsc.Status.SnapshotHandle != nil {
// CnsVolumeOperation instance for DeleteSnapshot
instanceMap[strings.Replace(*vsc.Status.SnapshotHandle, "+", "-", 1)] = true
}
}
}
for _, instance := range cnsVolumeOperationRequestList.Items {
latestOperationDetailsLength := len(instance.Status.LatestOperationDetails)
if latestOperationDetailsLength != 0 &&
instance.Status.LatestOperationDetails[latestOperationDetailsLength-1].TaskStatus ==
TaskInvocationStatusInProgress {
continue
}
var trimmedName string
switch {
case strings.HasPrefix(instance.Name, "pvc"):
trimmedName = instance.Name
case strings.HasPrefix(instance.Name, "delete"):
trimmedName = strings.TrimPrefix(instance.Name, "delete-")
case strings.HasPrefix(instance.Name, "expand"):
trimmedName = strings.TrimPrefix(instance.Name, "expand-")
case blockVolumeSnapshotEnabled && strings.HasPrefix(instance.Name, "snapshot"):
trimmedName = strings.TrimPrefix(instance.Name, "snapshot-")
case blockVolumeSnapshotEnabled && strings.HasPrefix(instance.Name, "deletesnapshot"):
trimmedName = strings.TrimPrefix(instance.Name, "deletesnapshot-")
}
if _, ok := instanceMap[trimmedName]; !ok {
err = or.DeleteRequestDetails(ctx, instance.Name)
if err != nil {
log.Errorf("failed to delete CnsVolumeOperationRequest instance %s with error %v",
instance.Name, err)
}
}
}
log.Infof("Clean up of stale CnsVolumeOperationRequest complete.")
}
}
func getCSINamespace() string {
csiNamespace := os.Getenv(EnvCSINamespace)
if strings.TrimSpace(csiNamespace) == "" {
return csiconfig.DefaultCSINamespace
}
return csiNamespace
}