From cebf6872ddd7adb299c800fb9e0790b9a41463e9 Mon Sep 17 00:00:00 2001 From: William Findlay Date: Fri, 3 Nov 2023 14:48:26 -0400 Subject: [PATCH] filters/fields: do a deep copy before filtering [upstream commit 09de1f83eb12ec13030be275c0395600b67d9644] We need to deep copy the event here to avoid issues caused by filtering out information that is shared between events through the event cache (e.g. process info). This can cause segmentation faults and other nasty bugs. Avoid all that by doing a deep copy here before filtering. This is not pretty or great for performance, but it at least works as a stopgap until we're able to refactor things so that it's no longer necessary. Signed-off-by: William Findlay --- cmd/tetra/getevents/io_reader_client.go | 22 ++++++++-------------- pkg/filters/fields.go | 19 +++++++++++++++---- pkg/filters/fields_test.go | 17 +++++++++-------- pkg/server/server.go | 21 +++++++++------------ 4 files changed, 41 insertions(+), 38 deletions(-) diff --git a/cmd/tetra/getevents/io_reader_client.go b/cmd/tetra/getevents/io_reader_client.go index c4943432457..f4e388f530e 100644 --- a/cmd/tetra/getevents/io_reader_client.go +++ b/cmd/tetra/getevents/io_reader_client.go @@ -16,7 +16,6 @@ import ( hubbleFilters "github.com/cilium/tetragon/pkg/oldhubble/filters" "google.golang.org/grpc" "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" ) // ioReaderClient implements tetragon.FineGuidanceSensors_GetEventsClient. @@ -101,28 +100,23 @@ func (i *ioReaderClient) GetVersion(_ context.Context, _ *tetragon.GetVersionReq func (i *ioReaderClient) Recv() (*tetragon.GetEventsResponse, error) { for i.scanner.Scan() { - var res tetragon.GetEventsResponse + res := &tetragon.GetEventsResponse{} line := i.scanner.Bytes() - err := i.unmarshaller.Unmarshal(line, &res) + err := i.unmarshaller.Unmarshal(line, res) if err != nil && i.debug { fmt.Fprintf(os.Stderr, "DEBUG: failed unmarshal: %s: %s\n", line, err) continue } - if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: &res}) { + if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: res}) { continue } - filterEvent := &res - if len(i.fieldFilters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters - // We need a copy of the exec event as modifing the original message - // can cause issues in the process cache (we keep a copy of that message there). - filterEvent = proto.Clone(&res).(*tetragon.GetEventsResponse) - } for _, filter := range i.fieldFilters { - // we need not to change res - // maybe only for exec events - filter.Filter(filterEvent) + res, err = filter.Filter(res) + if err != nil { + return nil, err + } } - return filterEvent, nil + return res, nil } if err := i.scanner.Err(); err != nil { return nil, err diff --git a/pkg/filters/fields.go b/pkg/filters/fields.go index c9f6fd4b003..2b467ddad48 100644 --- a/pkg/filters/fields.go +++ b/pkg/filters/fields.go @@ -12,6 +12,7 @@ import ( "github.com/cilium/tetragon/api/v1/tetragon" "github.com/mennanov/fmutils" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" ) @@ -144,7 +145,17 @@ func FieldFiltersFromGetEventsRequest(request *tetragon.GetEventsRequest) []*Fie // Filter filters the fields in the GetEventsResponse, keeping fields specified in the // inclusion filter and discarding fields specified in the exclusion filter. Exclusion // takes precedence over inclusion and an empty filter set will keep all remaining fields. -func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error { +func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) (*tetragon.GetEventsResponse, error) { + // We need to deep copy the event here to avoid issues caused by filtering out + // information that is shared between events through the event cache (e.g. process + // info). This can cause segmentation faults and other nasty bugs. Avoid all that by + // doing a deep copy here before filtering. + // + // FIXME: We need to fix this so that it doesn't kill performance by doing a deep + // copy. This will require architectural changes to both the field filters and the + // event cache. + event = proto.Clone(event).(*tetragon.GetEventsResponse) + if len(f.eventSet) > 0 { // skip filtering by default unless the event set is inverted, in which case we // want to filter by default and skip only if we have a match @@ -170,7 +181,7 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error { } if skipFiltering { - return nil + return event, nil } } @@ -190,8 +201,8 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error { }) if !rft.IsValid() { - return fmt.Errorf("invalid event after field filter") + return nil, fmt.Errorf("invalid event after field filter") } - return nil + return event, nil } diff --git a/pkg/filters/fields_test.go b/pkg/filters/fields_test.go index 2cac85aa9f8..2de7a0cb022 100644 --- a/pkg/filters/fields_test.go +++ b/pkg/filters/fields_test.go @@ -94,7 +94,7 @@ func TestEventFieldFilters(t *testing.T) { // Construct the filter filters := FieldFiltersFromGetEventsRequest(request) for _, filter := range filters { - filter.Filter(ev) + ev, _ = filter.Filter(ev) } // These fields should all have been included and so should not be empty @@ -125,12 +125,12 @@ func TestFieldFilterByEventType(t *testing.T) { } filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXIT}, []string{"process.pid"}, false) - filter.Filter(ev) + ev, _ = filter.Filter(ev) assert.NotEmpty(t, ev.GetProcessExec().Process.Pid) filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process.pid"}, false) - filter.Filter(ev) + ev, _ = filter.Filter(ev) assert.Empty(t, ev.GetProcessExec().Process.Pid) } @@ -225,7 +225,7 @@ func TestEmptyFieldFilter(t *testing.T) { } assert.True(t, proto.Equal(ev, expected), "events are equal before filter") - filter.Filter(ev) + ev, _ = filter.Filter(ev) assert.True(t, proto.Equal(ev, expected), "events are equal after filter") } @@ -250,7 +250,7 @@ func TestFieldFilterInvertedEventSet(t *testing.T) { filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process", "parent"}, true) assert.True(t, proto.Equal(ev, expected), "events are equal before filter") - filter.Filter(ev) + ev, _ = filter.Filter(ev) assert.True(t, proto.Equal(ev, expected), "events are equal after filter") ev = &tetragon.GetEventsResponse{ @@ -270,7 +270,7 @@ func TestFieldFilterInvertedEventSet(t *testing.T) { filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_KPROBE}, []string{"process", "parent"}, true) assert.False(t, proto.Equal(ev, expected), "events are not equal before filter") - filter.Filter(ev) + ev, _ = filter.Filter(ev) assert.True(t, proto.Equal(ev, expected), "events are equal after filter") } @@ -599,8 +599,9 @@ func TestSlimExecEventsFieldFilterExample(t *testing.T) { } for _, filter := range filters { - for _, ev := range evs { - filter.Filter(ev) + for i, ev := range evs { + ev, _ = filter.Filter(ev) + evs[i] = ev } } for i := range evs { diff --git a/pkg/server/server.go b/pkg/server/server.go index 40cab9fc488..4e53d8949a6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -22,7 +22,6 @@ import ( "github.com/cilium/tetragon/pkg/tracingpolicy" "github.com/cilium/tetragon/pkg/version" "github.com/sirupsen/logrus" - "google.golang.org/protobuf/proto" ) type Listener interface { @@ -165,22 +164,20 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon // Filter the GetEventsResponse fields filters := filters.FieldFiltersFromGetEventsRequest(request) - filterEvent := event - if len(filters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters - // We need a copy of the exec event as modifing the original message - // can cause issues in the process cache (we keep a copy of that message there). - filterEvent = proto.Clone(event).(*tetragon.GetEventsResponse) - } + for _, filter := range filters { - // we need not to change res - // maybe only for exec events - filter.Filter(filterEvent) + ev, err := filter.Filter(event) + if err != nil { + logger.GetLogger().WithField("filter", filter).WithError(err).Warn("Failed to apply field filter") + continue + } + event = ev } if aggregator != nil { // Send event to aggregator. select { - case aggregator.GetEventChannel() <- filterEvent: + case aggregator.GetEventChannel() <- event: default: logger.GetLogger(). WithField("request", request). @@ -188,7 +185,7 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon } } else { // No need to aggregate. Directly send out the response. - if err = server.Send(filterEvent); err != nil { + if err = server.Send(event); err != nil { s.ctxCleanupWG.Done() return err }