From 17dc3c6dbfb51ae13ecb515d7d2fa1d8ebc65b04 Mon Sep 17 00:00:00 2001 From: Adib Rastegarnia Date: Wed, 22 Feb 2023 11:48:55 -0800 Subject: [PATCH] Add topo client (#28) * Add topo client for the purpose of apps and subsystems * version --- VERSION | 2 +- go.mod | 1 + go.sum | 1 + pkg/topo/client.go | 172 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 pkg/topo/client.go diff --git a/VERSION b/VERSION index befa759..512a1fa 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.1.9-dev +1.1.9 diff --git a/go.mod b/go.mod index d6facea..173c6ae 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( require ( github.com/Shopify/sarama v1.31.1 // indirect github.com/atomix/runtime/sdk v0.7.4 // indirect + github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect diff --git a/go.sum b/go.sum index c57a546..26a2e9e 100644 --- a/go.sum +++ b/go.sum @@ -82,6 +82,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bits-and-blooms/bloom/v3 v3.2.0/go.mod h1:MC8muvBzzPOFsrcdND/A7kU7kMhkqb9KI70JlZCP+C8= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= diff --git a/pkg/topo/client.go b/pkg/topo/client.go new file mode 100644 index 0000000..1da802a --- /dev/null +++ b/pkg/topo/client.go @@ -0,0 +1,172 @@ +// SPDX-FileCopyrightText: 2020-present Open Networking Foundation +// +// SPDX-License-Identifier: Apache-2.0 + +// Package topo store client +package topo + +import ( + "context" + "io" + + "github.com/onosproject/onos-lib-go/pkg/grpc/retry" + "google.golang.org/grpc/codes" + + "github.com/onosproject/onos-lib-go/pkg/errors" + + topoapi "github.com/onosproject/onos-api/go/onos/topo" + "github.com/onosproject/onos-lib-go/pkg/logging" + "google.golang.org/grpc" +) + +var log = logging.GetLogger() + +// Store is a topology store client interface +type Store interface { + // Create creates a topology object + Create(ctx context.Context, object *topoapi.Object) error + + // Update updates an existing topology object + Update(ctx context.Context, object *topoapi.Object) error + + // Get gets a topology object + Get(ctx context.Context, id topoapi.ID) (*topoapi.Object, error) + + // Query streams objects to the given channel + Query(ctx context.Context, ch chan<- *topoapi.Object, filters *topoapi.Filters) error + + // Delete deletes a topology object using the given ID + Delete(ctx context.Context, object *topoapi.Object) error + + // Watch watches topology events + Watch(ctx context.Context, ch chan<- topoapi.Event, filters *topoapi.Filters) error +} + +// NewStore creates a new topology store +func NewStore(topoEndpoint string, opts ...grpc.DialOption) (Store, error) { + if len(opts) == 0 { + return nil, errors.New(errors.Invalid, "no opts given when creating topology store") + } + opts = append(opts, + grpc.WithStreamInterceptor(retry.RetryingStreamClientInterceptor(retry.WithRetryOn(codes.Unavailable, codes.Unknown))), + grpc.WithUnaryInterceptor(retry.RetryingUnaryClientInterceptor(retry.WithRetryOn(codes.Unavailable, codes.Unknown)))) + conn, err := grpc.DialContext(context.Background(), topoEndpoint, opts...) + if err != nil { + log.Warn(err) + return nil, err + } + client := topoapi.CreateTopoClient(conn) + return &topoClient{ + client: client, + }, nil +} + +type topoClient struct { + client topoapi.TopoClient +} + +// Create creates topology object in topo store +func (s *topoClient) Create(ctx context.Context, object *topoapi.Object) error { + log.Debugf("Creating topology object: %v", object) + _, err := s.client.Create(ctx, &topoapi.CreateRequest{ + Object: object, + }) + if err != nil { + log.Warn(err) + return errors.FromGRPC(err) + } + return nil +} + +// Update updates the given topology object in topo store +func (s *topoClient) Update(ctx context.Context, object *topoapi.Object) error { + log.Debugf("Updating topology object: %v", object) + response, err := s.client.Update(ctx, &topoapi.UpdateRequest{ + Object: object, + }) + if err != nil { + return errors.FromGRPC(err) + } + object = response.Object + log.Debug("Updated topology object is:", object) + return nil +} + +// Get gets topology object based on a given ID +func (s *topoClient) Get(ctx context.Context, id topoapi.ID) (*topoapi.Object, error) { + log.Debugf("Getting topology object with ID: %v", id) + getResponse, err := s.client.Get(ctx, &topoapi.GetRequest{ + ID: id, + }) + if err != nil { + return nil, errors.FromGRPC(err) + } + return getResponse.Object, nil +} + +// Query query topology objects based on a give set of filters +func (s *topoClient) Query(ctx context.Context, ch chan<- *topoapi.Object, filters *topoapi.Filters) error { + log.Debugf("Query topology objects") + stream, err := s.client.Query(ctx, &topoapi.QueryRequest{ + Filters: filters, + }) + if err != nil { + return errors.FromGRPC(err) + } + go func() { + defer close(ch) + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Warn(err) + break + } + ch <- resp.Object + } + }() + return nil +} + +// Delete deletes topology object using the given ID +func (s *topoClient) Delete(ctx context.Context, object *topoapi.Object) error { + log.Debugf("Deleting topology object with ID: %v", object.ID) + _, err := s.client.Delete(ctx, &topoapi.DeleteRequest{ + ID: object.GetID(), + Revision: object.GetRevision(), + }) + if err != nil { + return errors.FromGRPC(err) + } + return nil +} + +// Watch watches topology events +func (s *topoClient) Watch(ctx context.Context, ch chan<- topoapi.Event, filters *topoapi.Filters) error { + stream, err := s.client.Watch(ctx, &topoapi.WatchRequest{ + Noreplay: false, + Filters: filters, + }) + if err != nil { + return errors.FromGRPC(err) + } + go func() { + defer close(ch) + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Warn(err) + break + } + ch <- resp.Event + } + }() + return nil +} + +var _ Store = &topoClient{}