diff --git a/manager/allocator/allocator.go b/manager/allocator/allocator.go index b1c0058065..08a0ca71c9 100644 --- a/manager/allocator/allocator.go +++ b/manager/allocator/allocator.go @@ -125,6 +125,8 @@ func (a *Allocator) Run(ctx context.Context) error { aaCopy := aa actor := func() error { wg.Add(1) + defer wg.Done() + // init might return an allocator specific context // which is a child of the passed in context to hold // allocator specific state @@ -133,10 +135,10 @@ func (a *Allocator) Run(ctx context.Context) error { // if we are failing in the init of // this allocator. aa.cancel() - wg.Done() return err } + wg.Add(1) go func() { defer wg.Done() a.run(ctx, aaCopy) diff --git a/manager/allocator/network.go b/manager/allocator/network.go index 700143c40c..3e6cdefdc8 100644 --- a/manager/allocator/network.go +++ b/manager/allocator/network.go @@ -68,7 +68,7 @@ type networkContext struct { unallocatedNetworks map[string]*api.Network } -func (a *Allocator) doNetworkInit(ctx context.Context) error { +func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { na, err := networkallocator.New() if err != nil { return err @@ -81,6 +81,13 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error { unallocatedNetworks: make(map[string]*api.Network), ingressNetwork: newIngressNetwork(), } + a.netCtx = nc + defer func() { + // Clear a.netCtx if initialization was unsuccessful. + if err != nil { + a.netCtx = nil + } + }() // Check if we have the ingress network. If not found create // it before reading all network objects for allocation. @@ -125,7 +132,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error { // that the we can get the preferred subnet for ingress // network. if !na.IsAllocated(nc.ingressNetwork) { - if err := a.allocateNetwork(ctx, nc, nc.ingressNetwork); err != nil { + if err := a.allocateNetwork(ctx, nc.ingressNetwork); err != nil { log.G(ctx).WithError(err).Error("failed allocating ingress network during init") } @@ -155,7 +162,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error { continue } - if err := a.allocateNetwork(ctx, nc, n); err != nil { + if err := a.allocateNetwork(ctx, n); err != nil { log.G(ctx).WithError(err).Errorf("failed allocating network %s during init", n.ID) } } @@ -179,7 +186,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error { } node.Attachment.Network = nc.ingressNetwork.Copy() - if err := a.allocateNode(ctx, nc, node); err != nil { + if err := a.allocateNode(ctx, node); err != nil { log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s during init", node.ID) } } @@ -198,7 +205,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error { continue } - if err := a.allocateService(ctx, nc, s); err != nil { + if err := a.allocateService(ctx, s); err != nil { log.G(ctx).WithError(err).Errorf("failed allocating service %s during init", s.ID) } } @@ -260,7 +267,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error { } err := batch.Update(func(tx store.Tx) error { - _, err := a.allocateTask(ctx, nc, tx, t) + _, err := a.allocateTask(ctx, tx, t) return err }) if err != nil { @@ -274,7 +281,6 @@ func (a *Allocator) doNetworkInit(ctx context.Context) error { return err } - a.netCtx = nc return nil } @@ -288,7 +294,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if err := a.allocateNetwork(ctx, nc, n); err != nil { + if err := a.allocateNetwork(ctx, n); err != nil { log.G(ctx).WithError(err).Errorf("Failed allocation for network %s", n.ID) break } @@ -309,7 +315,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if err := a.allocateService(ctx, nc, s); err != nil { + if err := a.allocateService(ctx, s); err != nil { log.G(ctx).WithError(err).Errorf("Failed allocation for service %s", s.ID) break } @@ -320,7 +326,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if err := a.allocateService(ctx, nc, s); err != nil { + if err := a.allocateService(ctx, s); err != nil { log.G(ctx).WithError(err).Errorf("Failed allocation during update of service %s", s.ID) break } @@ -335,18 +341,18 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { // it's still there. delete(nc.unallocatedServices, s.ID) case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode: - a.doNodeAlloc(ctx, nc, ev) + a.doNodeAlloc(ctx, ev) case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask: - a.doTaskAlloc(ctx, nc, ev) + a.doTaskAlloc(ctx, ev) case state.EventCommit: - a.procUnallocatedNetworks(ctx, nc) - a.procUnallocatedServices(ctx, nc) - a.procUnallocatedTasksNetwork(ctx, nc) + a.procUnallocatedNetworks(ctx) + a.procUnallocatedServices(ctx) + a.procUnallocatedTasksNetwork(ctx) return } } -func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev events.Event) { +func (a *Allocator) doNodeAlloc(ctx context.Context, ev events.Event) { var ( isDelete bool node *api.Node @@ -362,6 +368,8 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even node = v.Node.Copy() } + nc := a.netCtx + if isDelete { if nc.nwkAllocator.IsNodeAllocated(node) { if err := nc.nwkAllocator.DeallocateNode(node); err != nil { @@ -377,7 +385,7 @@ func (a *Allocator) doNodeAlloc(ctx context.Context, nc *networkContext, ev even } node.Attachment.Network = nc.ingressNetwork.Copy() - if err := a.allocateNode(ctx, nc, node); err != nil { + if err := a.allocateNode(ctx, node); err != nil { log.G(ctx).WithError(err).Errorf("Failed to allocate network resources for node %s", node.ID) } } @@ -460,7 +468,7 @@ func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) { taskUpdateNetworks(t, networks) } -func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev events.Event) { +func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) { var ( isDelete bool t *api.Task @@ -476,6 +484,8 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even t = v.Task.Copy() } + nc := a.netCtx + // If the task has stopped running or it's being deleted then // we should free the network resources associated with the // task right away. @@ -526,7 +536,9 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, nc *networkContext, ev even nc.unallocatedTasks[t.ID] = t } -func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node *api.Node) error { +func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error { + nc := a.netCtx + if err := nc.nwkAllocator.AllocateNode(node); err != nil { return err } @@ -559,7 +571,9 @@ func (a *Allocator) allocateNode(ctx context.Context, nc *networkContext, node * return nil } -func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s *api.Service) error { +func (a *Allocator) allocateService(ctx context.Context, s *api.Service) error { + nc := a.netCtx + if s.Spec.Endpoint != nil { // service has user-defined endpoint if s.Endpoint == nil { @@ -644,7 +658,9 @@ func (a *Allocator) allocateService(ctx context.Context, nc *networkContext, s * return nil } -func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n *api.Network) error { +func (a *Allocator) allocateNetwork(ctx context.Context, n *api.Network) error { + nc := a.netCtx + if err := nc.nwkAllocator.Allocate(n); err != nil { nc.unallocatedNetworks[n.ID] = n return errors.Wrapf(err, "failed during network allocation for network %s", n.ID) @@ -666,7 +682,7 @@ func (a *Allocator) allocateNetwork(ctx context.Context, nc *networkContext, n * return nil } -func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx store.Tx, t *api.Task) (*api.Task, error) { +func (a *Allocator) allocateTask(ctx context.Context, tx store.Tx, t *api.Task) (*api.Task, error) { taskUpdated := false // Get the latest task state from the store before updating. @@ -675,6 +691,8 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto return nil, fmt.Errorf("could not find task %s while trying to update network allocation", t.ID) } + nc := a.netCtx + // We might be here even if a task allocation has already // happened but wasn't successfully committed to store. In such // cases skip allocation and go straight ahead to updating the @@ -734,10 +752,11 @@ func (a *Allocator) allocateTask(ctx context.Context, nc *networkContext, tx sto return storeT, nil } -func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkContext) { +func (a *Allocator) procUnallocatedNetworks(ctx context.Context) { + nc := a.netCtx for _, n := range nc.unallocatedNetworks { if !nc.nwkAllocator.IsAllocated(n) { - if err := a.allocateNetwork(ctx, nc, n); err != nil { + if err := a.allocateNetwork(ctx, n); err != nil { log.G(ctx).Debugf("Failed allocation of unallocated network %s: %v", n.ID, err) continue } @@ -747,10 +766,11 @@ func (a *Allocator) procUnallocatedNetworks(ctx context.Context, nc *networkCont } } -func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkContext) { +func (a *Allocator) procUnallocatedServices(ctx context.Context) { + nc := a.netCtx for _, s := range nc.unallocatedServices { if !nc.nwkAllocator.IsServiceAllocated(s) { - if err := a.allocateService(ctx, nc, s); err != nil { + if err := a.allocateService(ctx, s); err != nil { log.G(ctx).Debugf("Failed allocation of unallocated service %s: %v", s.ID, err) continue } @@ -760,7 +780,8 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context, nc *networkCont } } -func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context, nc *networkContext) { +func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) { + nc := a.netCtx tasks := make([]*api.Task, 0, len(nc.unallocatedTasks)) committed, err := a.store.Batch(func(batch *store.Batch) error { @@ -768,7 +789,7 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context, nc *network var allocatedT *api.Task err := batch.Update(func(tx store.Tx) error { var err error - allocatedT, err = a.allocateTask(ctx, nc, tx, t) + allocatedT, err = a.allocateTask(ctx, tx, t) return err })