Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: add GRPCGatewayRegister interface #38

Merged
merged 2 commits into from
Dec 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/gmqttd/command/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,14 @@ func NewStartCmd() *cobra.Command {
)
err = s.Init()
if err != nil {
fmt.Println(err)
os.Exit(1)
return
}
err = s.Run()
if err != nil {
fmt.Println(err)
os.Exit(1)
return
}
installSignal(s)
Expand Down
88 changes: 56 additions & 32 deletions plugin/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/DrmagicE/gmqtt/config"
"github.com/DrmagicE/gmqtt/server"
Expand All @@ -35,6 +33,24 @@ func New(config config.Config) (server.Plugin, error) {

var log *zap.Logger

// GRPCGatewayRegister provides the ability to share the gRPC and HTTP server to other plugins.
type GRPCGatewayRegister interface {
GRPCRegister
HTTPRegister
}

// GRPCRegister is the interface that enable the implement to expose gRPC endpoint.
type GRPCRegister interface {
// RegisterGRPC registers the gRPC handler into gRPC server which created by admin plugin.
RegisterGRPC(s grpc.ServiceRegistrar)
}

// HTTPRegister is the interface that enable the implement to expose HTTP endpoint.
type HTTPRegister interface {
// RegisterHTTP registers the http handler into http server which created by admin plugin.
RegisterHTTP(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error)
}

// Admin providers gRPC and HTTP API that enables the external system to interact with the broker.
type Admin struct {
config Config
Expand All @@ -46,26 +62,32 @@ type Admin struct {
store *store
}

func (a *Admin) registerHTTP() (err error) {
mux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true}))

func (a *Admin) registerHTTP(mux *runtime.ServeMux) (err error) {
err = RegisterClientServiceHandlerFromEndpoint(
context.Background(),
mux,
a.config.GRPC.Addr,
[]grpc.DialOption{grpc.WithInsecure()})
[]grpc.DialOption{grpc.WithInsecure()},
)
if err != nil {
return err
}

err = RegisterSubscriptionServiceHandlerFromEndpoint(
context.Background(),
mux,
a.config.GRPC.Addr,
[]grpc.DialOption{grpc.WithInsecure()})

[]grpc.DialOption{grpc.WithInsecure()},
)
if err != nil {
return err
}
err = RegisterPublishServiceHandlerFromEndpoint(
context.Background(),
mux,
a.config.GRPC.Addr,
[]grpc.DialOption{grpc.WithInsecure()})
[]grpc.DialOption{grpc.WithInsecure()},
)

if err != nil {
return err
Expand All @@ -92,9 +114,34 @@ func (a *Admin) Load(service server.Server) error {
grpc_prometheus.UnaryServerInterceptor),
)
a.grpcServer = s

RegisterClientServiceServer(s, &clientService{a: a})
RegisterSubscriptionServiceServer(s, &subscriptionService{a: a})
RegisterPublishServiceServer(s, &publisher{a: a})
mux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true}))
if a.config.HTTP.Enable {
err := a.registerHTTP(mux)
if err != nil {
return err
}
}

for _, v := range service.Plugins() {
if v, ok := v.(GRPCRegister); ok {
v.RegisterGRPC(s)
}

if v, ok := v.(HTTPRegister); a.config.HTTP.Enable && ok {
err := v.RegisterHTTP(context.Background(),
mux,
a.config.GRPC.Addr,
[]grpc.DialOption{grpc.WithInsecure()})
if err != nil {
return err
}
}

}
l, err := net.Listen("tcp", a.config.GRPC.Addr)
if err != nil {
return err
Expand All @@ -111,9 +158,6 @@ func (a *Admin) Load(service server.Server) error {
panic(err)
}
}()
if a.config.HTTP.Enable {
err = a.registerHTTP()
}
return err
}

Expand All @@ -128,23 +172,3 @@ func (a *Admin) Unload() error {
func (a *Admin) Name() string {
return Name
}

func getPage(reqPage, reqPageSize uint32) (page, pageSize uint) {
page = 1
pageSize = 20
if reqPage != 0 {
page = uint(reqPage)
}
if reqPageSize != 0 {
pageSize = uint(reqPageSize)
}
return
}

func InvalidArgument(name string, msg string) error {
errString := "invalid " + name
if msg != "" {
errString = errString + ":" + msg
}
return status.Error(codes.InvalidArgument, errString)
}
9 changes: 6 additions & 3 deletions plugin/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (c *clientService) mustEmbedUnimplementedClientServiceServer() {

// List lists clients information which the session is valid in the broker (both connected and disconnected).
func (c *clientService) List(ctx context.Context, req *ListClientRequest) (*ListClientResponse, error) {
page, pageSize := getPage(req.Page, req.PageSize)
page, pageSize := GetPage(req.Page, req.PageSize)
clients, total, err := c.a.store.GetClients(page, pageSize)
if err != nil {
return &ListClientResponse{}, err
Expand All @@ -30,9 +30,12 @@ func (c *clientService) List(ctx context.Context, req *ListClientRequest) (*List
// Get returns the client information for given request client id.
func (c *clientService) Get(ctx context.Context, req *GetClientRequest) (*GetClientResponse, error) {
if req.ClientId == "" {
return nil, InvalidArgument("client_id", "")
return nil, ErrInvalidArgument("client_id", "")
}
client := c.a.store.GetClientByID(req.ClientId)
if client == nil {
return nil, ErrNotFound
}
return &GetClientResponse{
Client: client,
}, nil
Expand All @@ -41,7 +44,7 @@ func (c *clientService) Get(ctx context.Context, req *GetClientRequest) (*GetCli
// Delete force disconnect.
func (c *clientService) Delete(ctx context.Context, req *DeleteClientRequest) (*empty.Empty, error) {
if req.ClientId == "" {
return nil, InvalidArgument("client_id", "")
return nil, ErrInvalidArgument("client_id", "")
}
if req.CleanSession {
c.a.clientService.TerminateSession(req.ClientId)
Expand Down
5 changes: 2 additions & 3 deletions plugin/admin/client.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions plugin/admin/client_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions plugin/admin/protos/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,20 @@ message Client {


service ClientService {
// lists clients
// List clients
rpc List (ListClientRequest) returns (ListClientResponse){
option (google.api.http) = {
get: "/v1/clients"
};
}
// Get the client for given client id.
// Return NotFound error when client not found.
rpc Get (GetClientRequest) returns (GetClientResponse){
option (google.api.http) = {
get: "/v1/clients/{client_id}"
};
}
// disconnect the client for given client id.
// Disconnect the client for given client id.
rpc Delete (DeleteClientRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
delete: "/v1/clients/{client_id}"
Expand Down
2 changes: 1 addition & 1 deletion plugin/admin/protos/publish.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message UserProperties {
}

service PublishService {
// publish message to broker
// Publish message to broker
rpc Publish (PublishRequest) returns (google.protobuf.Empty){
option (google.api.http) = {
post: "/v1/publish"
Expand Down
8 changes: 4 additions & 4 deletions plugin/admin/protos/subscription.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,26 @@ message Subscription {
string client_id = 7;
}
service SubscriptionService {
// list subscriptions.
// List subscriptions.
rpc List (ListSubscriptionRequest) returns (ListSubscriptionResponse){
option (google.api.http) = {
get: "/v1/subscriptions"
};
}
// filter subscriptions, paging is not supported in this api.
// Filter subscriptions, paging is not supported in this API.
rpc Filter(FilterSubscriptionRequest) returns (FilterSubscriptionResponse) {
option (google.api.http) = {
get: "/v1/filter_subscriptions"
};
}
// subscribe topics for the client.
// Subscribe topics for the client.
rpc Subscribe (SubscribeRequest) returns (SubscribeResponse) {
option (google.api.http) = {
post: "/v1/subscribe"
body:"*"
};
}
// unsubscribe topics for the client.
// Unsubscribe topics for the client.
rpc Unsubscribe (UnsubscribeRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
post: "/v1/unsubscribe"
Expand Down
8 changes: 4 additions & 4 deletions plugin/admin/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ func (p *publisher) mustEmbedUnimplementedPublishServiceServer() {
// Publish publishes a message into broker.
func (p *publisher) Publish(ctx context.Context, req *PublishRequest) (resp *empty.Empty, err error) {
if !packets.ValidV5Topic([]byte(req.TopicName)) {
return nil, InvalidArgument("topic_name", "")
return nil, ErrInvalidArgument("topic_name", "")
}
if req.Qos > uint32(packets.Qos2) {
return nil, InvalidArgument("qos", "")
return nil, ErrInvalidArgument("qos", "")
}
if req.PayloadFormat != 0 && req.PayloadFormat != 1 {
return nil, InvalidArgument("payload_format", "")
return nil, ErrInvalidArgument("payload_format", "")
}
if req.ResponseTopic != "" && !packets.ValidV5Topic([]byte(req.ResponseTopic)) {
return nil, InvalidArgument("response_topic", "")
return nil, ErrInvalidArgument("response_topic", "")
}
var userPpt []packets.UserProperty
for _, v := range req.UserProperties {
Expand Down
5 changes: 2 additions & 3 deletions plugin/admin/publish.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions plugin/admin/publish_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading