Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

filters/fields: do a deep copy before filtering #1726

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions cmd/tetra/getevents/io_reader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
hubbleFilters "github.com/cilium/tetragon/pkg/oldhubble/filters"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

// ioReaderClient implements tetragon.FineGuidanceSensors_GetEventsClient.
Expand Down Expand Up @@ -101,28 +100,23 @@ func (i *ioReaderClient) GetVersion(_ context.Context, _ *tetragon.GetVersionReq

func (i *ioReaderClient) Recv() (*tetragon.GetEventsResponse, error) {
for i.scanner.Scan() {
var res tetragon.GetEventsResponse
res := &tetragon.GetEventsResponse{}
line := i.scanner.Bytes()
err := i.unmarshaller.Unmarshal(line, &res)
err := i.unmarshaller.Unmarshal(line, res)
if err != nil && i.debug {
fmt.Fprintf(os.Stderr, "DEBUG: failed unmarshal: %s: %s\n", line, err)
continue
}
if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: &res}) {
if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: res}) {
continue
}
filterEvent := &res
if len(i.fieldFilters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(&res).(*tetragon.GetEventsResponse)
}
for _, filter := range i.fieldFilters {
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
res, err = filter.Filter(res)
if err != nil {
return nil, err
}
}
return filterEvent, nil
return res, nil
}
if err := i.scanner.Err(); err != nil {
return nil, err
Expand Down
19 changes: 15 additions & 4 deletions pkg/filters/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/mennanov/fmutils"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)

Expand Down Expand Up @@ -144,7 +145,17 @@ func FieldFiltersFromGetEventsRequest(request *tetragon.GetEventsRequest) []*Fie
// Filter filters the fields in the GetEventsResponse, keeping fields specified in the
// inclusion filter and discarding fields specified in the exclusion filter. Exclusion
// takes precedence over inclusion and an empty filter set will keep all remaining fields.
func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error {
func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) (*tetragon.GetEventsResponse, error) {
// We need to deep copy the event here to avoid issues caused by filtering out
// information that is shared between events through the event cache (e.g. process
// info). This can cause segmentation faults and other nasty bugs. Avoid all that by
// doing a deep copy here before filtering.
//
// FIXME: We need to fix this so that it doesn't kill performance by doing a deep
// copy. This will require architectural changes to both the field filters and the
// event cache.
event = proto.Clone(event).(*tetragon.GetEventsResponse)

if len(f.eventSet) > 0 {
// skip filtering by default unless the event set is inverted, in which case we
// want to filter by default and skip only if we have a match
Expand All @@ -170,7 +181,7 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error {
}

if skipFiltering {
return nil
return event, nil
}
}

Expand All @@ -190,8 +201,8 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error {
})

if !rft.IsValid() {
return fmt.Errorf("invalid event after field filter")
return nil, fmt.Errorf("invalid event after field filter")
}

return nil
return event, nil
}
17 changes: 9 additions & 8 deletions pkg/filters/fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestEventFieldFilters(t *testing.T) {
// Construct the filter
filters := FieldFiltersFromGetEventsRequest(request)
for _, filter := range filters {
filter.Filter(ev)
ev, _ = filter.Filter(ev)
}

// These fields should all have been included and so should not be empty
Expand Down Expand Up @@ -125,12 +125,12 @@ func TestFieldFilterByEventType(t *testing.T) {
}

filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXIT}, []string{"process.pid"}, false)
filter.Filter(ev)
ev, _ = filter.Filter(ev)

assert.NotEmpty(t, ev.GetProcessExec().Process.Pid)

filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process.pid"}, false)
filter.Filter(ev)
ev, _ = filter.Filter(ev)

assert.Empty(t, ev.GetProcessExec().Process.Pid)
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestEmptyFieldFilter(t *testing.T) {
}

assert.True(t, proto.Equal(ev, expected), "events are equal before filter")
filter.Filter(ev)
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")
}

Expand All @@ -250,7 +250,7 @@ func TestFieldFilterInvertedEventSet(t *testing.T) {

filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process", "parent"}, true)
assert.True(t, proto.Equal(ev, expected), "events are equal before filter")
filter.Filter(ev)
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")

ev = &tetragon.GetEventsResponse{
Expand All @@ -270,7 +270,7 @@ func TestFieldFilterInvertedEventSet(t *testing.T) {

filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_KPROBE}, []string{"process", "parent"}, true)
assert.False(t, proto.Equal(ev, expected), "events are not equal before filter")
filter.Filter(ev)
ev, _ = filter.Filter(ev)
assert.True(t, proto.Equal(ev, expected), "events are equal after filter")
}

Expand Down Expand Up @@ -599,8 +599,9 @@ func TestSlimExecEventsFieldFilterExample(t *testing.T) {
}

for _, filter := range filters {
for _, ev := range evs {
filter.Filter(ev)
for i, ev := range evs {
ev, _ = filter.Filter(ev)
evs[i] = ev
}
}
for i := range evs {
Expand Down
21 changes: 9 additions & 12 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cilium/tetragon/pkg/tracingpolicy"
"github.com/cilium/tetragon/pkg/version"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type Listener interface {
Expand Down Expand Up @@ -165,30 +164,28 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon

// Filter the GetEventsResponse fields
filters := filters.FieldFiltersFromGetEventsRequest(request)
filterEvent := event
if len(filters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters
// We need a copy of the exec event as modifing the original message
// can cause issues in the process cache (we keep a copy of that message there).
filterEvent = proto.Clone(event).(*tetragon.GetEventsResponse)
}

for _, filter := range filters {
// we need not to change res
// maybe only for exec events
filter.Filter(filterEvent)
ev, err := filter.Filter(event)
if err != nil {
logger.GetLogger().WithField("filter", filter).WithError(err).Warn("Failed to apply field filter")
continue
}
event = ev
}

if aggregator != nil {
// Send event to aggregator.
select {
case aggregator.GetEventChannel() <- filterEvent:
case aggregator.GetEventChannel() <- event:
default:
logger.GetLogger().
WithField("request", request).
Warn("Aggregator buffer is full. Consider increasing AggregatorOptions.channel_buffer_size.")
}
} else {
// No need to aggregate. Directly send out the response.
if err = server.Send(filterEvent); err != nil {
if err = server.Send(event); err != nil {
s.ctxCleanupWG.Done()
return err
}
Expand Down
Loading