diff --git a/cmd/tetra/getevents/io_reader_client.go b/cmd/tetra/getevents/io_reader_client.go index c4943432457..f4e388f530e 100644 --- a/cmd/tetra/getevents/io_reader_client.go +++ b/cmd/tetra/getevents/io_reader_client.go @@ -16,7 +16,6 @@ import ( hubbleFilters "github.com/cilium/tetragon/pkg/oldhubble/filters" "google.golang.org/grpc" "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" ) // ioReaderClient implements tetragon.FineGuidanceSensors_GetEventsClient. @@ -101,28 +100,23 @@ func (i *ioReaderClient) GetVersion(_ context.Context, _ *tetragon.GetVersionReq func (i *ioReaderClient) Recv() (*tetragon.GetEventsResponse, error) { for i.scanner.Scan() { - var res tetragon.GetEventsResponse + res := &tetragon.GetEventsResponse{} line := i.scanner.Bytes() - err := i.unmarshaller.Unmarshal(line, &res) + err := i.unmarshaller.Unmarshal(line, res) if err != nil && i.debug { fmt.Fprintf(os.Stderr, "DEBUG: failed unmarshal: %s: %s\n", line, err) continue } - if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: &res}) { + if !hubbleFilters.Apply(i.allowlist, nil, &hubbleV1.Event{Event: res}) { continue } - filterEvent := &res - if len(i.fieldFilters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters - // We need a copy of the exec event as modifing the original message - // can cause issues in the process cache (we keep a copy of that message there). - filterEvent = proto.Clone(&res).(*tetragon.GetEventsResponse) - } for _, filter := range i.fieldFilters { - // we need not to change res - // maybe only for exec events - filter.Filter(filterEvent) + res, err = filter.Filter(res) + if err != nil { + return nil, err + } } - return filterEvent, nil + return res, nil } if err := i.scanner.Err(); err != nil { return nil, err diff --git a/pkg/filters/fields.go b/pkg/filters/fields.go index c9f6fd4b003..2b467ddad48 100644 --- a/pkg/filters/fields.go +++ b/pkg/filters/fields.go @@ -12,6 +12,7 @@ import ( "github.com/cilium/tetragon/api/v1/tetragon" "github.com/mennanov/fmutils" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" ) @@ -144,7 +145,17 @@ func FieldFiltersFromGetEventsRequest(request *tetragon.GetEventsRequest) []*Fie // Filter filters the fields in the GetEventsResponse, keeping fields specified in the // inclusion filter and discarding fields specified in the exclusion filter. Exclusion // takes precedence over inclusion and an empty filter set will keep all remaining fields. -func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error { +func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) (*tetragon.GetEventsResponse, error) { + // We need to deep copy the event here to avoid issues caused by filtering out + // information that is shared between events through the event cache (e.g. process + // info). This can cause segmentation faults and other nasty bugs. Avoid all that by + // doing a deep copy here before filtering. + // + // FIXME: We need to fix this so that it doesn't kill performance by doing a deep + // copy. This will require architectural changes to both the field filters and the + // event cache. + event = proto.Clone(event).(*tetragon.GetEventsResponse) + if len(f.eventSet) > 0 { // skip filtering by default unless the event set is inverted, in which case we // want to filter by default and skip only if we have a match @@ -170,7 +181,7 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error { } if skipFiltering { - return nil + return event, nil } } @@ -190,8 +201,8 @@ func (f *FieldFilter) Filter(event *tetragon.GetEventsResponse) error { }) if !rft.IsValid() { - return fmt.Errorf("invalid event after field filter") + return nil, fmt.Errorf("invalid event after field filter") } - return nil + return event, nil } diff --git a/pkg/filters/fields_test.go b/pkg/filters/fields_test.go index 2cac85aa9f8..2de7a0cb022 100644 --- a/pkg/filters/fields_test.go +++ b/pkg/filters/fields_test.go @@ -94,7 +94,7 @@ func TestEventFieldFilters(t *testing.T) { // Construct the filter filters := FieldFiltersFromGetEventsRequest(request) for _, filter := range filters { - filter.Filter(ev) + ev, _ = filter.Filter(ev) } // These fields should all have been included and so should not be empty @@ -125,12 +125,12 @@ func TestFieldFilterByEventType(t *testing.T) { } filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXIT}, []string{"process.pid"}, false) - filter.Filter(ev) + ev, _ = filter.Filter(ev) assert.NotEmpty(t, ev.GetProcessExec().Process.Pid) filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process.pid"}, false) - filter.Filter(ev) + ev, _ = filter.Filter(ev) assert.Empty(t, ev.GetProcessExec().Process.Pid) } @@ -225,7 +225,7 @@ func TestEmptyFieldFilter(t *testing.T) { } assert.True(t, proto.Equal(ev, expected), "events are equal before filter") - filter.Filter(ev) + ev, _ = filter.Filter(ev) assert.True(t, proto.Equal(ev, expected), "events are equal after filter") } @@ -250,7 +250,7 @@ func TestFieldFilterInvertedEventSet(t *testing.T) { filter := NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_EXEC}, []string{"process", "parent"}, true) assert.True(t, proto.Equal(ev, expected), "events are equal before filter") - filter.Filter(ev) + ev, _ = filter.Filter(ev) assert.True(t, proto.Equal(ev, expected), "events are equal after filter") ev = &tetragon.GetEventsResponse{ @@ -270,7 +270,7 @@ func TestFieldFilterInvertedEventSet(t *testing.T) { filter = NewExcludeFieldFilter([]tetragon.EventType{tetragon.EventType_PROCESS_KPROBE}, []string{"process", "parent"}, true) assert.False(t, proto.Equal(ev, expected), "events are not equal before filter") - filter.Filter(ev) + ev, _ = filter.Filter(ev) assert.True(t, proto.Equal(ev, expected), "events are equal after filter") } @@ -599,8 +599,9 @@ func TestSlimExecEventsFieldFilterExample(t *testing.T) { } for _, filter := range filters { - for _, ev := range evs { - filter.Filter(ev) + for i, ev := range evs { + ev, _ = filter.Filter(ev) + evs[i] = ev } } for i := range evs { diff --git a/pkg/server/server.go b/pkg/server/server.go index 40cab9fc488..4e53d8949a6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -22,7 +22,6 @@ import ( "github.com/cilium/tetragon/pkg/tracingpolicy" "github.com/cilium/tetragon/pkg/version" "github.com/sirupsen/logrus" - "google.golang.org/protobuf/proto" ) type Listener interface { @@ -165,22 +164,20 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon // Filter the GetEventsResponse fields filters := filters.FieldFiltersFromGetEventsRequest(request) - filterEvent := event - if len(filters) > 0 && filterEvent.GetProcessExec() != nil { // this is an exec event and we have fieldFilters - // We need a copy of the exec event as modifing the original message - // can cause issues in the process cache (we keep a copy of that message there). - filterEvent = proto.Clone(event).(*tetragon.GetEventsResponse) - } + for _, filter := range filters { - // we need not to change res - // maybe only for exec events - filter.Filter(filterEvent) + ev, err := filter.Filter(event) + if err != nil { + logger.GetLogger().WithField("filter", filter).WithError(err).Warn("Failed to apply field filter") + continue + } + event = ev } if aggregator != nil { // Send event to aggregator. select { - case aggregator.GetEventChannel() <- filterEvent: + case aggregator.GetEventChannel() <- event: default: logger.GetLogger(). WithField("request", request). @@ -188,7 +185,7 @@ func (s *Server) GetEventsWG(request *tetragon.GetEventsRequest, server tetragon } } else { // No need to aggregate. Directly send out the response. - if err = server.Send(filterEvent); err != nil { + if err = server.Send(event); err != nil { s.ctxCleanupWG.Done() return err }