Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add azure blob scaler #514

Merged
merged 2 commits into from
Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewStanScaler(resolvedEnv, triggerMetadata)
case "huawei-cloudeye":
return scalers.NewHuaweiCloudeyeScaler(triggerMetadata, authParams)
case "azure-blob":
return scalers.NewAzureBlobScaler(resolvedEnv, triggerMetadata, authParams, podIdentity)
default:
return nil, fmt.Errorf("no scaler found for type: %s", triggerType)
}
Expand Down
60 changes: 60 additions & 0 deletions pkg/scalers/azure_blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package scalers

import (
"context"
"fmt"
"net/url"

"github.com/Azure/azure-storage-blob-go/azblob"
)

// GetAzureBlobListLength returns the count of the blobs in blob container in int
func GetAzureBlobListLength(ctx context.Context, podIdentity string, connectionString, blobContainerName string, accountName string, blobDelimiter string, blobPrefix string) (int, error) {

var credential azblob.Credential
var listBlobsSegmentOptions azblob.ListBlobsSegmentOptions
var err error

if podIdentity == "" || podIdentity == "none" {

var accountKey string

_, accountName, accountKey, _, err = ParseAzureStorageConnectionString(connectionString)

if err != nil {
return -1, err
}

credential, err = azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return -1, err
}
} else if podIdentity == "azure" {
token, err := getAzureADPodIdentityToken("https://storage.azure.com/")
if err != nil {
azureBlobLog.Error(err, "Error fetching token cannot determine blob list count")
return -1, nil
}

credential = azblob.NewTokenCredential(token.AccessToken, nil)
} else {
return -1, fmt.Errorf("Azure blobs doesn't support %s pod identity type", podIdentity)

}

if blobPrefix != "" {
listBlobsSegmentOptions.Prefix = blobPrefix
}

p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName))
serviceURL := azblob.NewServiceURL(*u, p)
containerURL := serviceURL.NewContainerURL(blobContainerName)

props, err := containerURL.ListBlobsHierarchySegment(ctx, azblob.Marker{} , blobDelimiter, listBlobsSegmentOptions)
if err != nil {
return -1, err
}

return len(props.Segment.BlobItems) , nil
}
184 changes: 184 additions & 0 deletions pkg/scalers/azure_blob_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package scalers

import (
"context"
"fmt"
"strconv"

v2beta1 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
blobCountMetricName = "blobCount"
defaultTargetBlobCount = 5
defaultBlobDelimiter = "/"
defaultBlobPrefix = ""
defaultBlobConnectionSetting = "AzureWebJobsStorage"
)

type azureBlobScaler struct {
metadata *azureBlobMetadata
podIdentity string
}

type azureBlobMetadata struct {
targetBlobCount int
blobContainerName string
blobDelimiter string
blobPrefix string
connection string
useAAdPodIdentity bool
accountName string
}

var azureBlobLog = logf.Log.WithName("azure_blob_scaler")

// NewAzureBlobScaler creates a new azureBlobScaler
func NewAzureBlobScaler(resolvedEnv, metadata, authParams map[string]string, podIdentity string) (Scaler, error) {
meta, podIdentity, err := parseAzureBlobMetadata(metadata, resolvedEnv, authParams, podIdentity)
if err != nil {
return nil, fmt.Errorf("error parsing azure blob metadata: %s", err)
}

return &azureBlobScaler{
metadata: meta,
podIdentity: podIdentity,
}, nil
}

func parseAzureBlobMetadata(metadata, resolvedEnv, authParams map[string]string, podAuth string) (*azureBlobMetadata, string, error) {
meta := azureBlobMetadata{}
meta.targetBlobCount = defaultTargetBlobCount
meta.blobDelimiter = defaultBlobDelimiter
meta.blobPrefix = defaultBlobPrefix

if val, ok := metadata[blobCountMetricName]; ok {
blobCount, err := strconv.Atoi(val)
if err != nil {
azureBlobLog.Error(err, "Error parsing azure blob metadata", "blobCountMetricName", blobCountMetricName)
return nil, "", fmt.Errorf("Error parsing azure blob metadata %s: %s", blobCountMetricName, err.Error())
}

meta.targetBlobCount = blobCount
}

if val, ok := metadata["blobContainerName"]; ok && val != "" {
meta.blobContainerName = val
} else {
return nil, "", fmt.Errorf("no blobContainerName given")
}

if val, ok := metadata["blobDelimiter"]; ok {
if val != "" {
meta.blobDelimiter = val
}
}

if val, ok := metadata["blobPrefix"]; ok {
if val != "" {
meta.blobPrefix = val + meta.blobDelimiter
}
}
// before triggerAuthentication CRD, pod identity was configured using this property
if val, ok := metadata["useAAdPodIdentity"]; ok && podAuth == "" {
if val == "true" {
podAuth = "azure"
}
}

// If the Use AAD Pod Identity is not present, or set to "none"
// then check for connection string
if podAuth == "" || podAuth == "none" {
// Azure Blob Scaler expects a "connection" parameter in the metadata
// of the scaler or in a TriggerAuthentication object
connection := authParams["connection"]
if connection != "" {
// Found the connection in a parameter from TriggerAuthentication
meta.connection = connection
} else {
connectionSetting := defaultBlobConnectionSetting
if val, ok := metadata["connection"]; ok && val != "" {
connectionSetting = val
}

if val, ok := resolvedEnv[connectionSetting]; ok {
meta.connection = val
} else {
return nil, "", fmt.Errorf("no connection setting given")
}
}
} else if podAuth == "azure" {
// If the Use AAD Pod Identity is present then check account name
if val, ok := metadata["accountName"]; ok && val != "" {
meta.accountName = val
} else {
return nil, "", fmt.Errorf("no accountName given")
}
} else {
return nil, "", fmt.Errorf("pod identity %s not supported for azure storage blobs", podAuth)
}

return &meta, podAuth, nil
}

// GetScaleDecision is a func
func (s *azureBlobScaler) IsActive(ctx context.Context) (bool, error) {
length, err := GetAzureBlobListLength(
ctx,
s.podIdentity,
s.metadata.connection,
s.metadata.blobContainerName,
s.metadata.accountName,
s.metadata.blobDelimiter,
s.metadata.blobPrefix,
)

if err != nil {
azureBlobLog.Error(err, "error)")
return false, err
}

return length > 0, nil
}

func (s *azureBlobScaler) Close() error {
return nil
}

func (s *azureBlobScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
targetBlobCount := resource.NewQuantity(int64(s.metadata.targetBlobCount), resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{MetricName: blobCountMetricName, TargetAverageValue: targetBlobCount}
metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta1.MetricSpec{metricSpec}
}

//GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *azureBlobScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
bloblen, err := GetAzureBlobListLength(
ctx,
s.podIdentity,
s.metadata.connection,
s.metadata.blobContainerName,
s.metadata.accountName,
s.metadata.blobDelimiter,
s.metadata.blobPrefix,
)

if err != nil {
azureBlobLog.Error(err, "error getting blob list length")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(bloblen), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
84 changes: 84 additions & 0 deletions pkg/scalers/azure_blob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package scalers

import (
"context"
"strings"
"testing"
)

func TestGetBlobLength(t *testing.T) {
length, err := GetAzureBlobListLength(context.TODO(), "", "", "blobContainerName", "", "","")
if length != -1 {
t.Error("Expected length to be -1, but got", length)
}

if err == nil {
t.Error("Expected error for empty connection string, but got nil")
}

if !strings.Contains(err.Error(), "parse storage connection string") {
t.Error("Expected error to contain parsing error message, but got", err.Error())
}

length, err = GetAzureBlobListLength(context.TODO(), "", "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "blobContainerName", "", "","")

if length != -1 {
t.Error("Expected length to be -1, but got", length)
}

if err == nil {
t.Error("Expected error for empty connection string, but got nil")
}

if !strings.Contains(err.Error(), "illegal base64") {
t.Error("Expected error to contain base64 error message, but got", err.Error())
}
}

var testAzBlobResolvedEnv = map[string]string{
"CONNECTION": "SAMPLE",
}

type parseAzBlobMetadataTestData struct {
metadata map[string]string
isError bool
resolvedEnv map[string]string
authParams map[string]string
podIdentity string
}

var testAzBlobMetadata = []parseAzBlobMetadataTestData{
// nothing passed
{map[string]string{}, true, testAzBlobResolvedEnv, map[string]string{}, ""},
// properly formed
{map[string]string{"connection": "CONNECTION", "blobContainerName": "sample", "blobCount": "5", "blobDelimiter": "/", "blobPrefix": "blobsubpath"}, false, testAzBlobResolvedEnv, map[string]string{}, ""},
// Empty blobcontainerName
{map[string]string{"connection": "CONNECTION", "blobContainerName": ""}, true, testAzBlobResolvedEnv, map[string]string{}, ""},
// improperly formed blobCount
{map[string]string{"connection": "CONNECTION", "blobContainerName": "sample", "blobCount": "AA"}, true, testAzBlobResolvedEnv, map[string]string{}, ""},
// podIdentity = azure with account name
{map[string]string{"accountName": "sample_acc", "blobContainerName": "sample_container"}, false, testAzBlobResolvedEnv, map[string]string{}, "azure"},
// podIdentity = azure without account name
{map[string]string{"accountName": "", "blobContainerName": "sample_container"}, true, testAzBlobResolvedEnv, map[string]string{}, "azure"},
// podIdentity = azure without blob container name
{map[string]string{"accountName": "sample_acc", "blobContainerName": ""}, true, testAzBlobResolvedEnv, map[string]string{}, "azure"},
// connection from authParams
{map[string]string{"blobContainerName": "sample_container", "blobCount": "5"}, false, testAzBlobResolvedEnv, map[string]string{"connection": "value"}, "none"},

}

func TestAzBlobParseMetadata(t *testing.T) {
for _, testData := range testAzBlobMetadata {
_, podIdentity, err := parseAzureBlobMetadata(testData.metadata, testData.resolvedEnv, testData.authParams, testData.podIdentity)
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Errorf("Expected error but got success. testData: %v", testData)
}
if testData.podIdentity != "" && testData.podIdentity != podIdentity && err == nil {
t.Error("Expected success but got error: podIdentity value is not returned as expected")

}
}
}
Loading