Skip to content

Commit

Permalink
feat: streamx interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie committed Feb 7, 2025
1 parent bfb90f8 commit 1117dfa
Showing 1 changed file with 1 addition and 85 deletions.
86 changes: 1 addition & 85 deletions pkg/streamx/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ import (
"context"
)

var (
_ ServerStreamingClient[int] = (*GenericClientStream[int, int])(nil)
_ ClientStreamingClient[int, int] = (*GenericClientStream[int, int])(nil)
_ BidiStreamingClient[int, int] = (*GenericClientStream[int, int])(nil)
_ ServerStreamingServer[int] = (*GenericServerStream[int, int])(nil)
_ ClientStreamingServer[int, int] = (*GenericServerStream[int, int])(nil)
_ BidiStreamingServer[int, int] = (*GenericServerStream[int, int])(nil)
)

/* Streaming Mode
--------------- [Unary Streaming] ---------------
--------------- (Req) returns (Res) ---------------
Expand Down Expand Up @@ -123,6 +114,7 @@ type ClientStreamingClient[Req, Res any] interface {
// ClientStreamingServer define server side client streaming APIs
type ClientStreamingServer[Req, Res any] interface {
Recv(ctx context.Context) (*Req, error)
SendAndClose(ctx context.Context, res *Res) error
ServerStream
}

Expand All @@ -139,79 +131,3 @@ type BidiStreamingServer[Req, Res any] interface {
Send(ctx context.Context, res *Res) error
ServerStream
}

// NewGenericClientStream return a generic client stream
func NewGenericClientStream[Req, Res any](cs ClientStream) *GenericClientStream[Req, Res] {
return &GenericClientStream[Req, Res]{
ClientStream: cs,
}
}

// GenericClientStream wrap stream IO with Send/Recv middlewares
type GenericClientStream[Req, Res any] struct {
ClientStream
}

func (x *GenericClientStream[Req, Res]) SendMsg(ctx context.Context, m any) error {
return x.ClientStream.SendMsg(ctx, m)
}

func (x *GenericClientStream[Req, Res]) RecvMsg(ctx context.Context, m any) (err error) {
return x.ClientStream.RecvMsg(ctx, m)
}

func (x *GenericClientStream[Req, Res]) Send(ctx context.Context, m *Req) error {
return x.SendMsg(ctx, m)
}

func (x *GenericClientStream[Req, Res]) Recv(ctx context.Context) (m *Res, err error) {
m = new(Res)
if err = x.RecvMsg(ctx, m); err != nil {
return nil, err
}
return m, nil
}

func (x *GenericClientStream[Req, Res]) CloseAndRecv(ctx context.Context) (*Res, error) {
if err := x.ClientStream.CloseSend(ctx); err != nil {
return nil, err
}
return x.Recv(ctx)
}

// NewGenericServerStream return generic server stream
func NewGenericServerStream[Req, Res any](ss ServerStream) *GenericServerStream[Req, Res] {
return &GenericServerStream[Req, Res]{
ServerStream: ss,
}
}

// GenericServerStream wrap stream IO with Send/Recv middlewares
type GenericServerStream[Req, Res any] struct {
ServerStream
}

func (x *GenericServerStream[Req, Res]) SendMsg(ctx context.Context, m any) error {
return x.ServerStream.SendMsg(ctx, m)
}

func (x *GenericServerStream[Req, Res]) RecvMsg(ctx context.Context, m any) (err error) {
return x.ServerStream.RecvMsg(ctx, m)
}

func (x *GenericServerStream[Req, Res]) Send(ctx context.Context, m *Res) error {
return x.ServerStream.SendMsg(ctx, m)
}

func (x *GenericServerStream[Req, Res]) SendAndClose(ctx context.Context, m *Res) error {
return x.Send(ctx, m)
}

func (x *GenericServerStream[Req, Res]) Recv(ctx context.Context) (m *Req, err error) {
m = new(Req)
err = x.ServerStream.RecvMsg(ctx, m)
if err != nil {
return nil, err
}
return m, nil
}

0 comments on commit 1117dfa

Please # to comment.