Skip to content

Commit

Permalink
DynamoDB: Make Unlock key delete conditional on being old leader's (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
EdwinRobbins authored and Brian Kassouf committed Jun 3, 2019
1 parent 9757638 commit d931d70
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 13 deletions.
63 changes: 51 additions & 12 deletions physical/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func NewDynamoDBBackend(conf map[string]string, logger log.Logger) (physical.Bac
if dynamodbMaxRetryString == "" {
dynamodbMaxRetryString = conf["dynamodb_max_retries"]
}
var dynamodbMaxRetry int = aws.UseServiceDefaultRetries
var dynamodbMaxRetry = aws.UseServiceDefaultRetries
if dynamodbMaxRetryString != "" {
var err error
dynamodbMaxRetry, err = strconv.Atoi(dynamodbMaxRetryString)
Expand Down Expand Up @@ -571,10 +571,31 @@ func (l *DynamoDBLock) Unlock() error {
}

l.held = false
if err := l.backend.Delete(context.Background(), l.key); err != nil {
return err

// Conditionally delete after check that the key is actually this Vault's and
// not been already claimed by another leader
condition := "#identity = :identity"
deleteMyLock := &dynamodb.DeleteItemInput{
TableName: &l.backend.table,
ConditionExpression: &condition,
Key: map[string]*dynamodb.AttributeValue{
"Path": {S: aws.String(recordPathForVaultKey(l.key))},
"Key": {S: aws.String(recordKeyForVaultKey(l.key))},
},
ExpressionAttributeNames: map[string]*string{
"#identity": aws.String("Identity"),
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":identity": {B: []byte(l.identity)},
},
}
return nil

_, err := l.backend.client.DeleteItem(deleteMyLock)
if isConditionCheckFailed(err) {
err = nil
}

return err
}

// Value checks whether or not the lock is held by any instance of DynamoDBLock,
Expand Down Expand Up @@ -612,7 +633,7 @@ func (l *DynamoDBLock) tryToLock(stop, success chan struct{}, errors chan error)
if err, ok := err.(awserr.Error); ok {
// Don't report a condition check failure, this means that the lock
// is already being held.
if err.Code() != dynamodb.ErrCodeConditionalCheckFailedException {
if !isConditionCheckFailed(err) {
errors <- err
}
} else {
Expand All @@ -634,7 +655,12 @@ func (l *DynamoDBLock) periodicallyRenewLock(done chan struct{}) {
select {
case <-ticker.C:
// This should not renew the lock if the lock was deleted from under you.
l.updateItem(false)
err := l.updateItem(false)
if err != nil {
if !isConditionCheckFailed(err) {
l.backend.logger.Error("error renewing leadership lock", "error", err)
}
}
case <-done:
ticker.Stop()
return
Expand Down Expand Up @@ -665,8 +691,8 @@ func (l *DynamoDBLock) updateItem(createIfMissing bool) error {
_, err := l.backend.client.UpdateItem(&dynamodb.UpdateItemInput{
TableName: aws.String(l.backend.table),
Key: map[string]*dynamodb.AttributeValue{
"Path": &dynamodb.AttributeValue{S: aws.String(recordPathForVaultKey(l.key))},
"Key": &dynamodb.AttributeValue{S: aws.String(recordKeyForVaultKey(l.key))},
"Path": {S: aws.String(recordPathForVaultKey(l.key))},
"Key": {S: aws.String(recordKeyForVaultKey(l.key))},
},
UpdateExpression: aws.String("SET #value=:value, #identity=:identity, #expires=:expires"),
// If both key and path already exist, we can only write if
Expand All @@ -682,12 +708,13 @@ func (l *DynamoDBLock) updateItem(createIfMissing bool) error {
"#value": aws.String("Value"),
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":identity": &dynamodb.AttributeValue{B: []byte(l.identity)},
":value": &dynamodb.AttributeValue{B: []byte(l.value)},
":now": &dynamodb.AttributeValue{N: aws.String(strconv.FormatInt(now.UnixNano(), 10))},
":expires": &dynamodb.AttributeValue{N: aws.String(strconv.FormatInt(now.Add(l.ttl).UnixNano(), 10))},
":identity": {B: []byte(l.identity)},
":value": {B: []byte(l.value)},
":now": {N: aws.String(strconv.FormatInt(now.UnixNano(), 10))},
":expires": {N: aws.String(strconv.FormatInt(now.Add(l.ttl).UnixNano(), 10))},
},
})

return err
}

Expand Down Expand Up @@ -831,3 +858,15 @@ func unescapeEmptyPath(s string) string {
}
return s
}

// isConditionCheckFailed tests whether err is an ErrCodeConditionalCheckFailedException
// from the AWS SDK.
func isConditionCheckFailed(err error) bool {
if err != nil {
if err, ok := err.(awserr.Error); ok {
return err.Code() == dynamodb.ErrCodeConditionalCheckFailedException
}
}

return false
}
4 changes: 3 additions & 1 deletion vault/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,9 @@ func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stop
c.logger.Error("clearing leader advertisement failed", "error", err)
}

c.heldHALock.Unlock()
if err := c.heldHALock.Unlock(); err != nil {
c.logger.Error("unlocking HA lock failed", "error", err)
}
c.heldHALock = nil
}

Expand Down

0 comments on commit d931d70

Please # to comment.