Skip to content

Commit

Permalink
allocator: Fix panic when allocations happen at init time
Browse files Browse the repository at this point in the history
a.netCtx is initialized too late, so if allocations happen as part of
doNetworkInit, a nil pointer dereference will cause a panic.

Initialize a.netCtx earlier and use a.netCtx directly in member
functions instead of passing the network context separately, so there is
no confusion about which to use.

Also change allocator.go to have separate entries in the waitgroup for
initialization and actually running the allocator, and defer `Done` for
both. This should prevent a panic like this from leading to a deadlock,
since the deferred `Done` will be reached.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Oct 17, 2016
1 parent 0424477 commit a8066c1
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 29 deletions.
4 changes: 3 additions & 1 deletion manager/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (a *Allocator) Run(ctx context.Context) error {
aaCopy := aa
actor := func() error {
wg.Add(1)
defer wg.Done()

// init might return an allocator specific context
// which is a child of the passed in context to hold
// allocator specific state
Expand All @@ -133,10 +135,10 @@ func (a *Allocator) Run(ctx context.Context) error {
// if we are failing in the init of
// this allocator.
aa.cancel()
wg.Done()
return err
}

wg.Add(1)
go func() {
defer wg.Done()
a.run(ctx, aaCopy)
Expand Down
77 changes: 49 additions & 28 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type networkContext struct {
unallocatedNetworks map[string]*api.Network
}

func (a *Allocator) doNetworkInit(ctx context.Context) error {
func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
na, err := networkallocator.New()
if err != nil {
return err
Expand All @@ -81,6 +81,13 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
unallocatedNetworks: make(map[string]*api.Network),
ingressNetwork: newIngressNetwork(),
}
a.netCtx = nc
defer func() {
// Clear a.netCtx if initialization was unsuccessful.
if err != nil {
a.netCtx = nil
}
}()

// Check if we have the ingress network. If not found create
// it before reading all network objects for allocation.
Expand Down Expand Up @@ -125,7 +132,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
// that the we can get the preferred subnet for ingress
// network.
if !na.IsAllocated(nc.ingressNetwork) {
if err := a.allocateNetwork(ctx, nc, nc.ingressNetwork); err != nil {
if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil {
log.G(ctx).WithError(err).Error("failed allocating ingress network during init")
}

Expand Down Expand Up @@ -155,7 +162,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
continue
}

if err := a.allocateNetwork(ctx, nc, n); err != nil {
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID)
}
}
Expand All @@ -179,7 +186,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
}

node.Attachment.Network = nc.ingressNetwork.Copy()
if err := a.allocateNode(ctx, nc, node); err != nil {
if err := a.allocateNode(ctx, node); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s during init", node.ID)
}
}
Expand All @@ -198,7 +205,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
continue
}

if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID)
}
}
Expand Down Expand Up @@ -260,7 +267,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
}

err := batch.Update(func(tx store.Tx) error {
_, err := a.allocateTask(ctx, nc, tx, t)
_, err := a.allocateTask(ctx, tx, t)
return err
})
if err != nil {
Expand All @@ -274,7 +281,6 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error {
return err
}

a.netCtx = nc
return nil
}

Expand All @@ -288,7 +294,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if err := a.allocateNetwork(ctx, nc, n); err != nil {
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID)
break
}
Expand All @@ -309,7 +315,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID)
break
}
Expand All @@ -320,7 +326,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
break
}

if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID)
break
}
Expand All @@ -335,18 +341,18 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
// it's still there.
delete(nc.unallocatedServices, s.ID)
case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
a.doNodeAlloc(ctx, nc, ev)
a.doNodeAlloc(ctx, ev)
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
a.doTaskAlloc(ctx, nc, ev)
a.doTaskAlloc(ctx, ev)
case state.EventCommit:
a.procUnallocatedNetworks(ctx, nc)
a.procUnallocatedServices(ctx, nc)
a.procUnallocatedTasksNetwork(ctx, nc)
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procUnallocatedTasksNetwork(ctx)
return
}
}

func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) {
var (
isDelete bool
node *api.Node
Expand All @@ -362,6 +368,8 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
node = v.Node.Copy()
}

nc := a.netCtx

if isDelete {
if nc.nwkAllocator.IsNodeAllocated(node) {
if err := nc.nwkAllocator.DeallocateNode(node); err != nil {
Expand All @@ -377,7 +385,7 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even
}

node.Attachment.Network = nc.ingressNetwork.Copy()
if err := a.allocateNode(ctx, nc, node); err != nil {
if err := a.allocateNode(ctx, node); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID)
}
}
Expand Down Expand Up @@ -460,7 +468,7 @@ func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
taskUpdateNetworks(t, networks)
}

func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev events.Event) {
func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
var (
isDelete bool
t *api.Task
Expand All @@ -476,6 +484,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
t = v.Task.Copy()
}

nc := a.netCtx

// If the task has stopped running or it's being deleted then
// we should free the network resources associated with the
// task right away.
Expand Down Expand Up @@ -526,7 +536,9 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even
nc.unallocatedTasks[t.ID] = t
}

func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *api.Node) error {
func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
nc := a.netCtx

if err := nc.nwkAllocator.AllocateNode(node); err != nil {
return err
}
Expand Down Expand Up @@ -559,7 +571,9 @@ func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *
return nil
}

func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *api.Service) error {
func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error {
nc := a.netCtx

if s.Spec.Endpoint != nil {
// service has user-defined endpoint
if s.Endpoint == nil {
Expand Down Expand Up @@ -644,7 +658,9 @@ func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *
return nil
}

func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *api.Network) error {
func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error {
nc := a.netCtx

if err := nc.nwkAllocator.Allocate(n); err != nil {
nc.unallocatedNetworks[n.ID] = n
return errors.Wrapf(err, "failed during network allocation for network %s", n.ID)
Expand All @@ -666,7 +682,7 @@ func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *
return nil
}

func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx store.Tx, t *api.Task) (*api.Task, error) {
func (a *Allocator) allocateTask(ctx context.Context, tx store.Tx, t *api.Task) (*api.Task, error) {
taskUpdated := false

// Get the latest task state from the store before updating.
Expand All @@ -675,6 +691,8 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
return nil, fmt.Errorf("could not find task %s while trying to update network allocation", t.ID)
}

nc := a.netCtx

// We might be here even if a task allocation has already
// happened but wasn't successfully committed to store. In such
// cases skip allocation and go straight ahead to updating the
Expand Down Expand Up @@ -734,10 +752,11 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto
return storeT, nil
}

func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkContext) {
func (a *Allocator) procUnallocatedNetworks(ctx context.Context) {
nc := a.netCtx
for _, n := range nc.unallocatedNetworks {
if !nc.nwkAllocator.IsAllocated(n) {
if err := a.allocateNetwork(ctx, nc, n); err != nil {
if err := a.allocateNetwork(ctx, n); err != nil {
log.G(ctx).Debugf("Failed allocation of unallocated network %s: %v", n.ID, err)
continue
}
Expand All @@ -747,10 +766,11 @@ func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkCont
}
}

func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkContext) {
func (a *Allocator) procUnallocatedServices(ctx context.Context) {
nc := a.netCtx
for _, s := range nc.unallocatedServices {
if !nc.nwkAllocator.IsServiceAllocated(s) {
if err := a.allocateService(ctx, nc, s); err != nil {
if err := a.allocateService(ctx, s); err != nil {
log.G(ctx).Debugf("Failed allocation of unallocated service %s: %v", s.ID, err)
continue
}
Expand All @@ -760,15 +780,16 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkCont
}
}

func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context, nc *networkContext) {
func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
nc := a.netCtx
tasks := make([]*api.Task, 0, len(nc.unallocatedTasks))

committed, err := a.store.Batch(func(batch *store.Batch) error {
for _, t := range nc.unallocatedTasks {
var allocatedT *api.Task
err := batch.Update(func(tx store.Tx) error {
var err error
allocatedT, err = a.allocateTask(ctx, nc, tx, t)
allocatedT, err = a.allocateTask(ctx, tx, t)
return err
})

Expand Down

0 comments on commit a8066c1

Please # to comment.