Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Honor task desired state in allocator #435

Merged
merged 1 commit into from
Apr 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions manager/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
ServiceID: "testServiceID2",
ServiceID: "testServiceID2",
DesiredState: api.TaskStateRunning,
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{
Expand Down Expand Up @@ -214,6 +215,7 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
DesiredState: api.TaskStateRunning,
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{
Expand Down Expand Up @@ -284,7 +286,8 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
ServiceID: "testServiceID2",
DesiredState: api.TaskStateRunning,
ServiceID: "testServiceID2",
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{},
Expand Down Expand Up @@ -320,6 +323,7 @@ func TestAllocator(t *testing.T) {
Status: &api.TaskStatus{
State: api.TaskStateNew,
},
DesiredState: api.TaskStateRunning,
Spec: &api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.Container{},
Expand Down
66 changes: 49 additions & 17 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {

if _, err := a.store.Batch(func(batch state.Batch) error {
for _, t := range tasks {
if taskDead(t) {
continue
}

// No container or network configured. Not interested.
if t.Spec.GetContainer() == nil {
continue
Expand All @@ -115,6 +119,10 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
continue
}

if nc.nwkAllocator.IsTaskAllocated(t) {
continue
}

err := batch.Update(func(tx state.Tx) error {
return a.allocateTask(ctx, nc, tx, t)
})
Expand Down Expand Up @@ -218,6 +226,17 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
}
}

// taskRunning checks whether a task is either actively running, or in the
// process of starting up.
func taskRunning(t *api.Task) bool {
return t.DesiredState == api.TaskStateRunning && t.Status != nil && t.Status.State <= api.TaskStateRunning
}

// taskDead checks whether a task is not actively running as far as allocator purposes are concerned.
func taskDead(t *api.Task) bool {
return t.DesiredState == api.TaskStateDead && t.Status != nil && t.Status.State > api.TaskStateRunning
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think t.DesiredState == api.TaskStateDead is a sufficient condition. Does the allocator care about the observed state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought about that but decided that we don't want to deallocate the network resources for the task until the task is actually not running in the node, because otherwise we might provide the same IP address to more than one running containers even though one of them might be going down

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I just thought of this after adding the comment.

But doesn't this mean that if a node fails, we will never free the network resources associated with the tasks it was running? What's the right behavior here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming that if the node fails, the task will be removed. That is why I am checking task dead or isDelete here: https://github.com/docker/swarm-v2/pull/435/files#diff-119d353212583d96a59cba8c82b80280R254 while deciding if I want to deallocate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks from failed node won't be immediately deleted, to provide task history. Instead, they generally have DesiredState set to DEAD.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. When will they be removed? I realized this as well when I was doing further testing with manager restart. Seems like we retain dead nodes and I was not handling that properly in doNetworkInit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a PR I'm going to open later today for this. The idea is that we will keep a certain number of old tasks per service instance, and then start deleting the oldest ones.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, there is no perfect solution to this. The current approach in the PR is the most conservative, since it will favor leaking resources over reusing them in a dangerous way. This seems like the right place to start, but in the future we might have to iterate on the approach.

}

func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
var (
isDelete bool
Expand All @@ -237,6 +256,21 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
return
}

// If the task has stopped running or it's being deleted then
// we should free the network resources associated with the
// task right away.
if taskDead(t) || isDelete {
if nc.nwkAllocator.IsTaskAllocated(t) {
if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
log.G(ctx).Errorf("Failed freeing network resources for task %s: %v", t.ID, err)
}
}

// Cleanup any task references that might exist in unallocatedTasks
delete(nc.unallocatedTasks, t.ID)
return
}

// No container or network configured. Not interested.
if t.Spec.GetContainer() == nil {
return
Expand All @@ -251,8 +285,18 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
}
return nil
}); err != nil {
log.G(ctx).Errorf("Failed to get service %s for task %s: %v", t.ServiceID, t.ID, err)
return
// If the task is running it is not normal to
// not be able to find the associated
// service. If the task is not running (task
// is either dead or the desired state is set
// to dead) then the service may not be
// available in store. But we still need to
// cleanup network resources associated with
// the task.
if taskRunning(t) && !isDelete {
log.G(ctx).Errorf("Event %T: Failed to get service %s for task %s state %s: %v", ev, t.ServiceID, t.ID, t.Status.State, err)
return
}
}
}

Expand Down Expand Up @@ -303,22 +347,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even

if !nc.nwkAllocator.IsTaskAllocated(t) ||
(s != nil && s.Spec.Endpoint != nil && !nc.nwkAllocator.IsServiceAllocated(s)) {
if isDelete {
delete(nc.unallocatedTasks, t.ID)
return
}

nc.unallocatedTasks[t.ID] = t
return
}

// If the task has stopped running or it's being deleted then
// we should free the network resources associated with the
// task.
if t.Status.State > api.TaskStateRunning || isDelete {
if err := nc.nwkAllocator.DeallocateTask(t); err != nil {
log.G(ctx).Errorf("Failed freeing network resources for task %s: %v", t.ID, err)
}
}
}

Expand Down Expand Up @@ -413,7 +443,9 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sta
// Update the network allocations and moving to
// ALLOCATED state on top of the latest store state.
if a.taskAllocateVote(networkVoter, t.ID) {
storeT.Status.State = api.TaskStateAllocated
if storeT.Status.State < api.TaskStateAllocated {
storeT.Status.State = api.TaskStateAllocated
}
}

storeT.Networks = t.Networks
Expand Down