Skip to content

Commit

Permalink
Fix for service import cycles
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Nov 21, 2020
1 parent a0c4c5c commit 2e3c226
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 46 deletions.
40 changes: 40 additions & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,10 +1343,26 @@ func (a *Account) AddServiceImportWithClaim(destination *Account, from, to strin
return ErrServiceImportAuthorization
}

if a.importFormsCycle(destination, from, to) {
return ErrServiceImportFormsCycle
}

_, err := a.addServiceImport(destination, from, to, imClaim)
return err
}

// Detects if we have a cycle.
func (a *Account) importFormsCycle(destination *Account, from, to string) bool {
// Check that what we are importing is not something we also export.
if a.serviceExportOverlaps(to) {
// So at this point if destination account is also importing from us, that forms a cycle.
if destination.serviceImportOverlaps(from) {
return true
}
}
return false
}

// SetServiceImportSharing will allow sharing of information about requests with the export account.
// Used for service latency tracking at the moment.
func (a *Account) SetServiceImportSharing(destination *Account, to string, allow bool) error {
Expand Down Expand Up @@ -1585,6 +1601,30 @@ func (a *Account) checkForReverseEntry(reply string, si *serviceImport, checkInt
}
}

// Internal check to see if the to subject overlaps with another export.
func (a *Account) serviceExportOverlaps(to string) bool {
a.mu.RLock()
defer a.mu.RUnlock()
for subj := range a.exports.services {
if to == subj || SubjectsCollide(to, subj) {
return true
}
}
return false
}

// Internal check to see if the from subject overlaps with another import.
func (a *Account) serviceImportOverlaps(from string) bool {
a.mu.RLock()
defer a.mu.RUnlock()
for subj := range a.imports.services {
if from == subj || SubjectsCollide(from, subj) {
return true
}
}
return false
}

// Internal check to see if a service import exists.
func (a *Account) serviceImportExists(dest *Account, from string) bool {
a.mu.RLock()
Expand Down
1 change: 1 addition & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3705,6 +3705,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
}
continue
}

// Assume delivery subject is the normal subject to this point.
dsubj = subj

Expand Down
3 changes: 3 additions & 0 deletions server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ var (
// ErrServiceImportAuthorization is returned when a service import is not authorized.
ErrServiceImportAuthorization = errors.New("service import not authorized")

// ErrServiceImportFormsCycle is returned when a service import forms a cycle.
ErrServiceImportFormsCycle = errors.New("service import forms cycle")

// ErrClientOrRouteConnectedToGatewayPort represents an error condition when
// a client or route attempted to connect to the Gateway port.
ErrClientOrRouteConnectedToGatewayPort = errors.New("attempted to connect to gateway port")
Expand Down
86 changes: 40 additions & 46 deletions test/service_latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,68 +1304,62 @@ func TestServiceAndStreamStackOverflow(t *testing.T) {
}

func TestServiceCycle(t *testing.T) {
t.Skip("Remove the skip to run this test and shows the issue")

conf := createConfFile(t, []byte(`
accounts {
A {
users = [ { user: "a", pass: "a" } ]
exports [
{ service: help }
]
imports [
{ service { subject: help, account: B } }
]
exports [ { service: help } ]
imports [ { service { subject: help, account: B } } ]
}
B {
users = [ { user: "b", pass: "b" } ]
exports [
{ service: help }
]
imports [
{ service { subject: help, account: A } }
]
exports [ { service: help } ]
imports [ { service { subject: help, account: A } } ]
}
}
`))
defer os.Remove(conf)

srv, opts := RunServerWithConfig(conf)
defer srv.Shutdown()

// Responder (just request sub)
nc, err := nats.Connect(fmt.Sprintf("nats://a:a@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
if _, err := server.ProcessConfigFile(conf); err == nil || !strings.Contains(err.Error(), server.ErrServiceImportFormsCycle.Error()) {
t.Fatalf("Expected an error on cycle service import, got none")
}
defer nc.Close()

cb := func(m *nats.Msg) {
m.Respond(nil)
}
sub, _ := nc.Subscribe("help", cb)
nc.Flush()

// Requestor
nc2, err := nats.Connect(fmt.Sprintf("nats://b:b@%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc2.Close()
conf = createConfFile(t, []byte(`
accounts {
A {
exports [ { service: * } ]
imports [ { service { subject: help, account: B } } ]
}
B {
exports [ { service: help } ]
imports [ { service { subject: *, account: A } } ]
}
}
`))
defer os.Remove(conf)

// Send a single request.
if _, err := nc2.Request("help", []byte("hi"), time.Second); err != nil {
t.Fatal("Did not get the reply")
if _, err := server.ProcessConfigFile(conf); err == nil || !strings.Contains(err.Error(), server.ErrServiceImportFormsCycle.Error()) {
t.Fatalf("Expected an error on cycle service import, got none")
}

// Make sure works for queue subscribers as well.
sub.Unsubscribe()
sub, _ = nc.QueueSubscribe("help", "prod", cb)
nc.Flush()
conf = createConfFile(t, []byte(`
accounts {
A {
exports [ { service: * } ]
imports [ { service { subject: help, account: B } } ]
}
B {
exports [ { service: help } ]
imports [ { service { subject: help, account: C } } ]
}
C {
exports [ { service: * } ]
imports [ { service { subject: *, account: A } } ]
}
}
`))
defer os.Remove(conf)

// Send a single request.
if _, err := nc2.Request("help", []byte("hi"), time.Second); err != nil {
t.Fatal("Did not get the reply")
if _, err := server.ProcessConfigFile(conf); err == nil || !strings.Contains(err.Error(), server.ErrServiceImportFormsCycle.Error()) {
t.Fatalf("Expected an error on cycle service import, got none")
}
}

Expand Down

0 comments on commit 2e3c226

Please # to comment.