Skip to content
This repository was archived by the owner on Apr 24, 2024. It is now read-only.

Node: initial commits for node server #11

Merged
merged 2 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ require (
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f // indirect
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
google.golang.org/genproto v0.0.0-20201002142447-3860012362da // indirect
google.golang.org/grpc v1.32.0 // indirect
google.golang.org/grpc v1.32.0
gopkg.in/ini.v1 v1.61.0 // indirect
k8s.io/api v0.19.2
k8s.io/apimachinery v0.19.2
k8s.io/client-go v0.19.2
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.3.0 // indirect
k8s.io/kube-openapi v0.0.0-20200923155610-8b5066479488 // indirect
k8s.io/utils v0.0.0-20200912215256-4140de9c8800 // indirect
k8s.io/utils v0.0.0-20200912215256-4140de9c8800
sigs.k8s.io/controller-runtime v0.6.3 // indirect
)
219 changes: 211 additions & 8 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,27 @@ package node

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"

"github.com/container-object-storage-interface/api/apis/objectstorage.k8s.io/v1alpha1"
cs "github.com/container-object-storage-interface/api/clientset/typed/objectstorage.k8s.io/v1alpha1"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
"k8s.io/utils/mount"
)

var _ csi.NodeServer = &NodeServer{}
const protocolFileName string = `protocolConn.json`
var getError = func(t, n string, e error) error { return fmt.Errorf("failed to get <%s>%s: %v", t, n, e) }

func NewNodeServer(driverName, nodeID string, c cs.ObjectstorageV1alpha1Client, kube kubernetes.Interface) csi.NodeServer {
return &NodeServer{
Expand All @@ -20,6 +34,12 @@ func NewNodeServer(driverName, nodeID string, c cs.ObjectstorageV1alpha1Client,
}
}

// logErr should be called at the interface method scope, prior to returning errors to the gRPC client.
func logErr(e error) error {
klog.Error(e)
return e
}

// NodeServer implements the NodePublishVolume and NodeUnpublishVolume methods
// of the csi.NodeServer interface and GetPluginCapabilities, GetPluginInfo, and
// Probe of the IdentityServer interface.
Expand All @@ -32,35 +52,214 @@ type NodeServer struct {
}

func (n NodeServer) getBAR(barName, barNs string) (*v1alpha1.BucketAccessRequest, error) {
panic("implement me")
klog.Infof("getting bucketAccessRequest %q", fmt.Sprintf("%s/%s", barNs, barName))
bar, err := n.cosiClient.BucketAccessRequests(barNs).Get(n.ctx, barName, metav1.GetOptions{})
if err != nil || bar == nil || !bar.Status.AccessGranted {
return nil, logErr(getError("bucketAccessRequest", fmt.Sprintf("%s/%s", barNs, barName), err))
}
if len(bar.Spec.BucketRequestName) == 0 {
return nil, logErr(fmt.Errorf("bucketAccessRequest.Spec.BucketRequestName unset"))
}
return bar, nil
}

func (n NodeServer) getBA(baName string) (*v1alpha1.BucketAccess, error) {
panic("implement me")
klog.Infof("getting bucketAccess %q", fmt.Sprintf("%s", baName))
ba, err := n.cosiClient.BucketAccesses().Get(n.ctx, baName, metav1.GetOptions{})
if err != nil || ba == nil || !ba.Status.AccessGranted {
return nil, logErr(getError("bucketAccess", fmt.Sprintf("%s", baName), err))
}
return ba, nil
}

func (n NodeServer) getBR(brName, brNs string) (*v1alpha1.BucketRequest, error) {
panic("implement me")
klog.Infof("getting bucketRequest %q", brName)
br, err := n.cosiClient.BucketRequests(brNs).Get(n.ctx, brName, metav1.GetOptions{})
if err != nil || br == nil || !br.Status.BucketAvailable {
return nil, logErr(getError("bucketRequest", fmt.Sprintf("%s/%s", brNs, brName), err))
}
return br, nil
}

func (n NodeServer) getB(bName string) (*v1alpha1.Bucket, error) {
panic("implement me")
klog.Infof("getting bucket %q", bName)
// is BucketInstanceName the correct field, or should it be BucketClass
bkt, err := n.cosiClient.Buckets().Get(n.ctx, bName, metav1.GetOptions{})
if err != nil || bkt == nil || !bkt.Status.BucketAvailable {
return nil, logErr(getError("bucket", bName, err))
}
return bkt, nil
}

func (n NodeServer) NodeStageVolume(ctx context.Context, request *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
panic("implement me")
klog.Infof("NodeStageVolume: volId: %v, targetPath: %v\n", request.GetVolumeId(), request.StagingTargetPath)

name, ns, err := parseVolumeContext(request.VolumeContext)
if err != nil {
return nil, err
}

pod, err := n.kubeClient.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, logErr(getError("pod", fmt.Sprintf("%s/%s", ns, name), err))
}

barName, barNs, err := parsePod(pod, n.name)

if err != nil {
return nil, err
}
bar, err := n.getBAR(barName, barNs)
if err != nil {
return nil, err
}
ba, err := n.getBA(bar.Spec.BucketAccessName)
if err != nil {
return nil, err
}
br, err := n.getBR(bar.Spec.BucketRequestName, barNs)
if err != nil {
return nil, err
}

bkt, err := n.getB(br.Spec.BucketInstanceName)
if err != nil {
return nil, err
}
secret, err := n.kubeClient.CoreV1().Secrets(barNs).Get(ctx, ba.Spec.MintedSecretName, metav1.GetOptions{})
if err != nil {
return nil, logErr(getError("pod", fmt.Sprintf("%s/%s", barNs, ba.Spec.MintedSecretName), err))
}
var protocolConnection interface{}
switch bkt.Spec.Protocol.ProtocolName {
case v1alpha1.ProtocolNameS3:
protocolConnection = bkt.Spec.Protocol.S3
case v1alpha1.ProtocolNameAzure:
protocolConnection = bkt.Spec.Protocol.AzureBlob
case v1alpha1.ProtocolNameGCS:
protocolConnection = bkt.Spec.Protocol.GCS
case "":
err = fmt.Errorf("bucket protocol not signature")
default:
err = fmt.Errorf("unrecognized protocol %q, unable to extract connection data", bkt.Spec.Protocol)
}

if err != nil {
return nil, logErr(err)
}
klog.Infof("bucket %q has protocol %q", bkt.Name, bkt.Spec.Protocol)

data := make(map[string]interface{})
data["protocol"] = protocolConnection
data["connection"] = secret.Data

protoData, err := json.Marshal(data)
if err != nil {
return nil, logErr(fmt.Errorf("error marshalling protocol: %v", err))
}

target := filepath.Join(request.StagingTargetPath, protocolFileName)
klog.Infof("creating conn file: %s", target)
f, err := os.Open(target)
if err != nil {
return nil, logErr(fmt.Errorf("error creating file: %s: %v", target, err))
}
defer f.Close()
_, err = f.Write(protoData)
if err != nil {
return nil, logErr(fmt.Errorf("unable to write to file: %v", err))
}
return &csi.NodeStageVolumeResponse{}, nil
}

const (
podNameKey = "csi.storage.k8s.io/pod.name"
podNamespaceKey = "csi.storage.k8s.io/pod.namespace"
barNameKey = "bar-name"
barNamespaceKey = "bar-namespace"
)


func parseValue(key string, ctx map[string]string) (string, error) {
value, ok := ctx[key]
if !ok {
return "", fmt.Errorf("required volume context key unset: %v", key)
}
klog.Infof("got value: %v", value)
return value, nil
}

func parseVolumeContext(volCtx map[string]string) (name, ns string, err error) {
klog.Info("parsing bucketAccessRequest namespace/name from volume context")

name, err = parseValue(podNameKey, volCtx)
if err != nil {
return "", "", err
}

ns, err = parseValue(podNamespaceKey, volCtx)
if err != nil {
return "", "", err
}

return
}

func parsePod(pod *v1.Pod, driverName string) (name, ns string, err error) {
klog.Info("parsing bucketAccessRequest namespace/name from pod")

for _, v := range pod.Spec.Volumes {
if v.CSI != nil && v.CSI.Driver == driverName {
name, ok := v.CSI.VolumeAttributes[barNameKey]
if !ok {
return "", "", errors.New("invalid BAR Name")
}
namespace, ok := v.CSI.VolumeAttributes[barNamespaceKey]
if !ok {
return "", "", errors.New("invalid BAR Namespace")
}
return name, namespace, nil
}
}

return "", "", nil
}

func (n NodeServer) NodeUnstageVolume(ctx context.Context, request *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
panic("implement me")
}

func (n NodeServer) NodePublishVolume(ctx context.Context, request *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
panic("implement me")
vID := request.GetVolumeId()
stagingTargetPath := request.GetStagingTargetPath()
targetPath := request.GetTargetPath()

if vID == "" {
return nil, status.Error(codes.InvalidArgument, "volume ID missing in request")
}

if err := os.MkdirAll(targetPath, 0755); err != nil {
return nil, status.Errorf(codes.Internal, "Stage Volume Failed: %v", err)
}

if err := mount.New("").Mount(stagingTargetPath, targetPath, "", []string{"bind"}); err != nil {
return nil, status.Errorf(codes.Internal, "Publish Volume Mount Failed: %v", err)
}

return &csi.NodePublishVolumeResponse{}, nil
}

func (n NodeServer) NodeUnpublishVolume(ctx context.Context, request *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
panic("implement me")
klog.Infof("NodeUnpublishVolume: volId: %v, targetPath: %v\n", request.GetVolumeId(), request.GetTargetPath())
target := filepath.Join(request.TargetPath, protocolFileName)
err := os.RemoveAll(target)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, logErr(fmt.Errorf("unable to remove file %s: %v", target, err))
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}

func (n NodeServer) NodeGetVolumeStats(ctx context.Context, request *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
Expand All @@ -76,5 +275,9 @@ func (n NodeServer) NodeGetCapabilities(ctx context.Context, request *csi.NodeGe
}

func (n NodeServer) NodeGetInfo(ctx context.Context, request *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
panic("implement me")
klog.Infof("NodeGetInfo()")
resp := &csi.NodeGetInfoResponse{
NodeId: n.nodeID,
}
return resp, nil
}