Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add flow client with sender/receiver #3405

Merged
merged 9 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/firewall/uspfilter/conntrack/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

var logger = log.NewFromLogrus(logrus.StandardLogger())
var flowLogger = netflow.NewManager(context.Background()).GetLogger()
var flowLogger = netflow.NewManager(context.Background(), []byte{}).GetLogger()

// Memory pressure tests
func BenchmarkMemoryPressure(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion client/firewall/uspfilter/uspfilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

var logger = log.NewFromLogrus(logrus.StandardLogger())
var flowLogger = netflow.NewManager(context.Background()).GetLogger()
var flowLogger = netflow.NewManager(context.Background(), []byte{}).GetLogger()

type IFaceMock struct {
SetFilterFunc func(device.PacketFilter) error
Expand Down
2 changes: 1 addition & 1 deletion client/internal/acl/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
mgmProto "github.com/netbirdio/netbird/management/proto"
)

var flowLogger = netflow.NewManager(context.Background()).GetLogger()
var flowLogger = netflow.NewManager(context.Background(), []byte{}).GetLogger()

func TestDefaultManager(t *testing.T) {
networkMap := &mgmProto.NetworkMap{
Expand Down
2 changes: 1 addition & 1 deletion client/internal/dns/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/netbirdio/netbird/formatter"
)

var flowLogger = netflow.NewManager(context.Background()).GetLogger()
var flowLogger = netflow.NewManager(context.Background(), []byte{}).GetLogger()

type mocWGIface struct {
filter device.PacketFilter
Expand Down
3 changes: 2 additions & 1 deletion client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func NewEngine(
statusRecorder *peer.Status,
checks []*mgmProto.Checks,
) *Engine {
publicKey := config.WgPrivateKey.PublicKey()
engine := &Engine{
clientCtx: clientCtx,
clientCancel: clientCancel,
Expand All @@ -234,7 +235,7 @@ func NewEngine(
statusRecorder: statusRecorder,
checks: checks,
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
flowManager: netflow.NewManager(clientCtx),
flowManager: netflow.NewManager(clientCtx, publicKey[:]),
}
if runtime.GOOS == "ios" {
if !fileExists(mobileDep.StateFilePath) {
Expand Down
4 changes: 4 additions & 0 deletions client/internal/netflow/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (l *Logger) GetEvents() []*types.Event {
return l.Store.GetEvents()
}

func (l *Logger) DeleteEvents(ids []string) {
l.Store.DeleteEvents(ids)
}

func (l *Logger) Close() {
l.stop()
l.cancel()
Expand Down
115 changes: 107 additions & 8 deletions client/internal/netflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,62 @@ package netflow
import (
"context"
"sync"
"time"

log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/netbirdio/netbird/client/internal/netflow/logger"
"github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/flow/client"
"github.com/netbirdio/netbird/flow/proto"
)

type Manager struct {
mux sync.Mutex
logger types.FlowLogger
flowConfig *types.FlowConfig
mux sync.Mutex
logger types.FlowLogger
flowConfig *types.FlowConfig
ctx context.Context
receiverClient *client.GRPCClient
publicKey []byte
}

func NewManager(ctx context.Context) *Manager {
func NewManager(ctx context.Context, publicKey []byte) *Manager {
return &Manager{
logger: logger.New(ctx),
logger: logger.New(ctx),
ctx: ctx,
publicKey: publicKey,
}
}

func (m *Manager) Update(update *types.FlowConfig) error {
m.mux.Lock()
defer m.mux.Unlock()
if update == nil {
return nil
}

m.mux.Lock()
defer m.mux.Unlock()
previous := m.flowConfig
m.flowConfig = update

if update.Enabled {
m.logger.Enable()
if previous == nil || !previous.Enabled {
flowClient, err := client.NewClient(m.ctx, m.flowConfig.URL, m.flowConfig.TokenPayload, m.flowConfig.TokenSignature)
if err != nil {
return err
}
log.Infof("flow client connected to %s", m.flowConfig.URL)
m.receiverClient = flowClient
go m.receiveACKs()
go m.startSender()
}
return nil
}

m.logger.Disable()
if previous != nil && previous.Enabled {
return m.receiverClient.Close()
}

return nil
}
Expand All @@ -46,3 +70,78 @@ func (m *Manager) Close() {
func (m *Manager) GetLogger() types.FlowLogger {
return m.logger
}

func (m *Manager) startSender() {
ticker := time.NewTicker(m.flowConfig.Interval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
events := m.logger.GetEvents()
for _, event := range events {
log.Infof("send flow event to server: %s", event.ID)
err := m.send(event)
if err != nil {
log.Errorf("send flow event to server: %s", err)
}
}
}
}
}

func (m *Manager) receiveACKs() {
if m.receiverClient == nil {
return
}
err := m.receiverClient.Receive(m.ctx, func(ack *proto.FlowEventAck) error {
log.Infof("receive flow event ack: %s", ack.EventId)
m.logger.DeleteEvents([]string{ack.EventId})
return nil
})
if err != nil {
log.Errorf("receive flow event ack: %s", err)
}
}

func (m *Manager) send(event *types.Event) error {
if m.receiverClient == nil {
return nil
}
return m.receiverClient.Send(m.ctx, toProtoEvent(m.publicKey, event))
}

func toProtoEvent(publicKey []byte, event *types.Event) *proto.FlowEvent {
protoEvent := &proto.FlowEvent{
EventId: event.ID,
FlowId: event.FlowID.String(),
Timestamp: timestamppb.New(event.Timestamp),
PublicKey: publicKey,
EventFields: &proto.EventFields{
Type: proto.Type(event.Type),
Direction: proto.Direction(event.Direction),
Protocol: uint32(event.Protocol),
SourceIp: event.SourceIP.AsSlice(),
DestIp: event.DestIP.AsSlice(),
},
}
if event.Protocol == 1 {
protoEvent.EventFields.ConnectionInfo = &proto.EventFields_IcmpInfo{
IcmpInfo: &proto.ICMPInfo{
IcmpType: uint32(event.ICMPType),
IcmpCode: uint32(event.ICMPCode),
},
}
return protoEvent
}

protoEvent.EventFields.ConnectionInfo = &proto.EventFields_PortInfo{
PortInfo: &proto.PortInfo{
SourcePort: uint32(event.SourcePort),
DestPort: uint32(event.DestPort),
},
}

return protoEvent
}
2 changes: 2 additions & 0 deletions client/internal/netflow/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type FlowLogger interface {
StoreEvent(flowEvent EventFields)
// GetEvents returns all stored events
GetEvents() []*Event
// DeleteEvents deletes events from the store
DeleteEvents([]string)
// Close closes the logger
Close()
// Enable enables the flow logger receiver
Expand Down
32 changes: 32 additions & 0 deletions flow/client/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package client

import (
"context"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

var _ credentials.PerRPCCredentials = (*authToken)(nil)

type authToken struct {
metaMap map[string]string
}

func (t authToken) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return t.metaMap, nil
}

func (authToken) RequireTransportSecurity() bool {
return false // Set to true if you want to require a secure connection
}

// WithAuthToken returns a DialOption which sets the receiver flow credentials and places auth state on each outbound RPC
func withAuthToken(payload, signature string) grpc.DialOption {
value := fmt.Sprintf("%s.%s", signature, payload)
authMap := map[string]string{
"authorization": "Bearer " + value,
}
return grpc.WithPerRPCCredentials(authToken{metaMap: authMap})
}
Loading
Loading