diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 73ce1b71152..4d04206faa5 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -381,6 +381,7 @@ /packages/universal_profiling_symbolizer @elastic/obs-ds-intake-services /packages/vectra_detect @elastic/security-service-integrations /packages/vsphere @elastic/obs-infraobs-integrations +/packages/websocket @elastic/security-service-integrations /packages/watchguard_firebox @elastic/sec-deployment-and-devices /packages/websphere_application_server @elastic/obs-infraobs-integrations /packages/windows @elastic/elastic-agent-data-plane @elastic/sec-windows-platform diff --git a/packages/websocket/_dev/build/build.yml b/packages/websocket/_dev/build/build.yml new file mode 100644 index 00000000000..1f4fa988f6e --- /dev/null +++ b/packages/websocket/_dev/build/build.yml @@ -0,0 +1,4 @@ +dependencies: + ecs: + reference: git@v8.11.0 + import_mappings: true diff --git a/packages/websocket/_dev/build/docs/README.md b/packages/websocket/_dev/build/docs/README.md new file mode 100644 index 00000000000..04c90b360a3 --- /dev/null +++ b/packages/websocket/_dev/build/docs/README.md @@ -0,0 +1,27 @@ +# Custom WebSocket Input + +The WebSocket input integration enables ingestion of real-time data from WebSocket servers. WebSockets provide a full-duplex communication channel over a single, long-lived connection, which makes it suitable for scenarios where low latency data transmission is required. + +This input type connects to a WebSocket URL, listens for messages sent by the server, and processes these messages as they arrive. The data is then published to Elasticsearch, making it searchable and analyzable in near real-time. + +## Configuration + +The full documentation for configuring the WebSocket input can be found [here](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html). + +To configure the WebSocket input, specify the connection URL and other optional parameters such as headers for authentication or protocol versions. Advanced options for connection handling, such as timeouts and subprotocols, can be configured in the "Advanced options" section. + +### Example Configuration + +Here is a basic example of how to configure the WebSocket input: + +![Configuration Page](../img/websocket_configuration.png) + +This configuration establishes a WebSocket connection to ws://localhost:443/v1/stream and uses basic authentication. + +## Data Processing + +The WebSocket input will consume messages from the server as they are transmitted. These messages are expected to be in a format that Filebeat can process, such as JSON. If the message format is different, you may need to define a processor to parse and structure the data before it is sent to Elasticsearch. + +## Connection Management + +The WebSocket input manages the connection to the WebSocket server, including automatically reconnecting if the connection is lost. The input does not maintain any state between restarts, so if the server sends historical data, it will be re-ingested upon reconnection. \ No newline at end of file diff --git a/packages/websocket/_dev/deploy/docker/docker-compose.yml b/packages/websocket/_dev/deploy/docker/docker-compose.yml new file mode 100644 index 00000000000..3f6c4a1e898 --- /dev/null +++ b/packages/websocket/_dev/deploy/docker/docker-compose.yml @@ -0,0 +1,15 @@ +version: "2.3" +services: + websocket-mock-service: + image: golang:1.21-alpine + hostname: websocket + working_dir: /app + volumes: + - ./websocket-mock-service:/app + ports: + - "3000:3000" + healthcheck: + test: "wget --no-verbose --tries=1 --spider http://localhost:3000/health || exit 1" + interval: 10s + timeout: 5s + command: ["go", "run", "main.go"] diff --git a/packages/websocket/_dev/deploy/docker/websocket-mock-service/go.mod b/packages/websocket/_dev/deploy/docker/websocket-mock-service/go.mod new file mode 100644 index 00000000000..f86e58f7783 --- /dev/null +++ b/packages/websocket/_dev/deploy/docker/websocket-mock-service/go.mod @@ -0,0 +1,7 @@ +module websocket-mock-service + +go 1.21.3 + +require github.com/gorilla/websocket v1.5.1 + +require golang.org/x/net v0.17.0 // indirect diff --git a/packages/websocket/_dev/deploy/docker/websocket-mock-service/go.sum b/packages/websocket/_dev/deploy/docker/websocket-mock-service/go.sum new file mode 100644 index 00000000000..272772f063c --- /dev/null +++ b/packages/websocket/_dev/deploy/docker/websocket-mock-service/go.sum @@ -0,0 +1,4 @@ +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= diff --git a/packages/websocket/_dev/deploy/docker/websocket-mock-service/main.go b/packages/websocket/_dev/deploy/docker/websocket-mock-service/main.go new file mode 100644 index 00000000000..393e7afd03e --- /dev/null +++ b/packages/websocket/_dev/deploy/docker/websocket-mock-service/main.go @@ -0,0 +1,81 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +import ( + "log" + "net/http" + + "github.com/gorilla/websocket" +) + +func main() { + http.HandleFunc("/", handleWebSocket) + log.Fatal(http.ListenAndServe(":3000", nil)) +} + +func handleWebSocket(w http.ResponseWriter, r *http.Request) { + + if r.URL.Path == "/health" { + return + } + + if r.URL.Path == "/testbasicauth" { + // Check if the 'Authorization' header is set for basic authentication + authHeader := r.Header.Get("Authorization") + if authHeader != "Basic dGVzdDp0ZXN0" { + // If the header is incorrect, return an authentication error message + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("Error: Authentication failed.")) + return + } + } + + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + defer conn.Close() + + var responseMessage []map[string]string + + if r.URL.Path == "/testbasicauth" { + // Check if the 'Authorization' header is set for basic authentication + authHeader := r.Header.Get("Authorization") + if authHeader == "Basic dGVzdDp0ZXN0" { + // If the header is correct, return a success message + responseMessage = []map[string]string{ + { + "message": "You are now authenticated to the WebSocket server.", + }, + } + } + } else if r.URL.Path == "/test" { + // Return a success message + responseMessage = []map[string]string{ + { + "ts": "2024-01-01T01:00:00.000000-00:00", + "data": "testdata1", + "id": "test1234567891", + }, + { + "ts": "2024-01-01T02:00:00.000000-00:00", + "data": "testdata2", + "id": "test1234567890", + }, + } + } + + // Send a message to the client upon successful WebSocket connection + err = conn.WriteJSON(responseMessage) + if err != nil { + log.Println("write:", err) + return + } +} diff --git a/packages/websocket/_dev/test/system/test-auth-config.yml b/packages/websocket/_dev/test/system/test-auth-config.yml new file mode 100644 index 00000000000..f58cb933997 --- /dev/null +++ b/packages/websocket/_dev/test/system/test-auth-config.yml @@ -0,0 +1,9 @@ +input: websocket +service: websocket-mock-service +vars: + url: ws://{{Hostname}}:{{Port}}/testbasicauth + basic_token: "dGVzdDp0ZXN0" + program: | + bytes(state.response).decode_json().as(body,{ + "events": body, + }) diff --git a/packages/websocket/_dev/test/system/test-get-config.yml b/packages/websocket/_dev/test/system/test-get-config.yml new file mode 100644 index 00000000000..2bea317e9bf --- /dev/null +++ b/packages/websocket/_dev/test/system/test-get-config.yml @@ -0,0 +1,13 @@ +input: websocket +service: websocket-mock-service +vars: + url: ws://{{Hostname}}:{{Port}}/test + program: | + bytes(state.response).decode_json().as(body,{ + "events": body.map(e, { + "message": e.encode_json(), + }), + "cursor": { + "max_ts": body.map(e, e.ts).max() + } + }) diff --git a/packages/websocket/agent/input/websocket.yml.hbs b/packages/websocket/agent/input/websocket.yml.hbs new file mode 100644 index 00000000000..dcd6ffeea01 --- /dev/null +++ b/packages/websocket/agent/input/websocket.yml.hbs @@ -0,0 +1,60 @@ +data_stream: + dataset: {{data_stream.dataset}} +{{#if ssl}} +resource.ssl: {{ssl}} +{{/if}} +url: {{url}} + +program: {{escape_string program}} + +{{#if state}} +state: + {{state}} +{{/if}} +redact.delete: {{delete_redacted_fields}} +{{#if redact_fields}} +redact.fields: +{{#each redact_fields as |field|}} + - {{field}} +{{/each}} +{{/if}} + +{{#if regexp}} +regexp: + {{regexp}} +{{/if}} + +{{#if basic_token}} +auth.basic_token: {{basic_token}} +{{else if bearer_token}} +auth.bearer_token: {{bearer_token}} +{{/if}} + +{{#unless basic_token}} +{{#unless bearer_token}} +{{#if header_key}} +auth.custom.header: {{header_key}} +{{/if}} +{{#if header_value}} +auth.custom.value: {{header_value}} +{{/if}} +{{/unless}} +{{/unless}} + +{{#if pipeline}} +pipeline: {{pipeline}} +{{/if}} + +{{#if tags}} +tags: +{{#each tags as |tag|}} + - {{tag}} +{{/each}} +{{/if}} +{{#contains "forwarded" tags}} +publisher_pipeline.disable_host: true +{{/contains}} +{{#if processors}} +processors: +{{processors}} +{{/if}} diff --git a/packages/websocket/changelog.yml b/packages/websocket/changelog.yml new file mode 100644 index 00000000000..65f09b3b363 --- /dev/null +++ b/packages/websocket/changelog.yml @@ -0,0 +1,6 @@ +# newer versions go on top +- version: "0.1.0" + changes: + - description: Initial Implementation. + type: enhancement + link: https://github.com/elastic/integrations/pull/9926 diff --git a/packages/websocket/docs/README.md b/packages/websocket/docs/README.md new file mode 100644 index 00000000000..04c90b360a3 --- /dev/null +++ b/packages/websocket/docs/README.md @@ -0,0 +1,27 @@ +# Custom WebSocket Input + +The WebSocket input integration enables ingestion of real-time data from WebSocket servers. WebSockets provide a full-duplex communication channel over a single, long-lived connection, which makes it suitable for scenarios where low latency data transmission is required. + +This input type connects to a WebSocket URL, listens for messages sent by the server, and processes these messages as they arrive. The data is then published to Elasticsearch, making it searchable and analyzable in near real-time. + +## Configuration + +The full documentation for configuring the WebSocket input can be found [here](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html). + +To configure the WebSocket input, specify the connection URL and other optional parameters such as headers for authentication or protocol versions. Advanced options for connection handling, such as timeouts and subprotocols, can be configured in the "Advanced options" section. + +### Example Configuration + +Here is a basic example of how to configure the WebSocket input: + +![Configuration Page](../img/websocket_configuration.png) + +This configuration establishes a WebSocket connection to ws://localhost:443/v1/stream and uses basic authentication. + +## Data Processing + +The WebSocket input will consume messages from the server as they are transmitted. These messages are expected to be in a format that Filebeat can process, such as JSON. If the message format is different, you may need to define a processor to parse and structure the data before it is sent to Elasticsearch. + +## Connection Management + +The WebSocket input manages the connection to the WebSocket server, including automatically reconnecting if the connection is lost. The input does not maintain any state between restarts, so if the server sends historical data, it will be re-ingested upon reconnection. \ No newline at end of file diff --git a/packages/websocket/fields/base-fields.yml b/packages/websocket/fields/base-fields.yml new file mode 100644 index 00000000000..3ca883eb577 --- /dev/null +++ b/packages/websocket/fields/base-fields.yml @@ -0,0 +1,16 @@ +- name: data_stream.type + type: constant_keyword + description: Data stream type. +- name: data_stream.dataset + type: constant_keyword + description: Data stream dataset. +- name: data_stream.namespace + type: constant_keyword + description: Data stream namespace. +- name: event.module + type: constant_keyword + description: Event module. + value: websocket +- name: '@timestamp' + type: date + description: Event timestamp. diff --git a/packages/websocket/fields/beats.yml b/packages/websocket/fields/beats.yml new file mode 100644 index 00000000000..b3701b581cf --- /dev/null +++ b/packages/websocket/fields/beats.yml @@ -0,0 +1,9 @@ +- name: input.type + type: keyword + description: Type of filebeat input. +- name: log.offset + type: long + description: Log offset. +- name: tags + type: keyword + description: User defined tags. diff --git a/packages/websocket/img/websocket_configuration.png b/packages/websocket/img/websocket_configuration.png new file mode 100644 index 00000000000..6ed69bf08cd Binary files /dev/null and b/packages/websocket/img/websocket_configuration.png differ diff --git a/packages/websocket/manifest.yml b/packages/websocket/manifest.yml new file mode 100644 index 00000000000..ea7820bd1d4 --- /dev/null +++ b/packages/websocket/manifest.yml @@ -0,0 +1,148 @@ +format_version: 3.0.2 +name: websocket +title: Custom Websocket logs +version: 0.1.0 +description: Collect custom events from a socket server with Elastic agent. +type: input +categories: + - custom +conditions: + kibana: + version: "^8.13.0" + elastic: + subscription: "basic" +policy_templates: + - name: websocket + title: Websocket Custom Socket Input + description: Collect custom data from websocket server. + input: websocket + type: logs + template_path: websocket.yml.hbs + vars: + - name: data_stream.dataset + type: text + title: Dataset name + description: | + Dataset to write data to. Changing the dataset will send the data to a different index. You can't use `-` in the name of a dataset and only valid characters for [Elasticsearch index names](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html). + default: websocket.websocket + required: true + show_user: true + - name: url + type: text + title: URL + description: URL of the Websocket Server. + required: true + show_user: true + - name: pipeline + type: text + title: Ingest Pipeline + description: | + The Ingest Node pipeline ID to be used by the integration. + required: false + show_user: true + - name: program + type: textarea + title: The CEL program to be run for each message. + description: | + Program is the The CEL program that is executed on each message received. + More information can be found in the [documentation](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html#_execution_2). + show_user: true + multi: false + required: true + default: | + # // Read and process simple websocket messages from a local websocket server. + # bytes(state.response).decode_json().as(inner_body,{ + # "events": inner_body.encode_json(), + # }) + - name: state + type: yaml + title: Initial CEL evaluation state + description: | + State is the initial state to be provided to the program. If it has a cursor field, that field will be overwritten by any stored cursor, but will be available if no stored cursor exists. + More information can be found in the [documentation](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html#state-websocket). + show_user: true + multi: false + required: false + - name: regexp + type: yaml + title: Defined Regular Expressions + description: | + Regexps is the set of regular expression to be made available to the program by name. The syntax used is [RE2](https://github.com/google/re2/wiki/Syntax). + More information can be found in the [documentation](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html#regexp-websocket). + show_user: true + multi: false + required: false + default: | + #products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)' + #solutions: '(?i)(Search|Observability|Security)' + - name: basic_token + type: password + title: Basic Auth Token + show_user: true + required: false + description: The token to be used for Basic Authentication. + secret: true + - name: bearer_token + type: password + title: Bearer Auth Token + show_user: true + required: false + description: The token to be used for Bearer Authentication. + secret: true + - name: header_key + type: text + title: Custom Header Key + show_user: true + required: false + description: The custom header key name for Authentication. + - name: header_value + type: password + title: Custom Header Value + show_user: true + required: false + description: The custom header key value for Authentication. + secret: true + - name: redact_fields + type: text + title: Redacted fields + description: | + Fields to redact in debug logs. When logging at debug-level the input state and CEL evaluation state are included + in logs. This may leak secrets, so list sensitive state fields in this configuration. + show_user: true + multi: true + required: false + - name: delete_redacted_fields + type: bool + title: Delete redacted fields + description: | + The default behavior for field redaction is to replace characters with `*`s. If field value length or presence will + leak information, the fields can be deleted from logging by setting this configuration to true. + show_user: true + multi: false + default: false + required: true + - name: ssl + type: yaml + title: SSL Configuration + description: i.e. certificate_authorities, supported_protocols, verification_mode etc, more examples found in the [documentation](https://www.elastic.co/guide/en/beats/filebeat/current/configuration-ssl.html#ssl-common-config). + multi: false + required: false + show_user: false + - name: processors + type: yaml + title: Processors + multi: false + required: false + show_user: false + description: >- + Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/beats/filebeat/current/filtering-and-enhancing-data.html) for details. + - name: tags + type: text + title: Tags + multi: true + show_user: false + default: + - forwarded +owner: + github: elastic/security-service-integrations + type: elastic diff --git a/packages/websocket/sample_event.json b/packages/websocket/sample_event.json new file mode 100644 index 00000000000..9043ab73452 --- /dev/null +++ b/packages/websocket/sample_event.json @@ -0,0 +1,35 @@ +{ + "@timestamp": "2024-07-08T09:25:41.330Z", + "agent": { + "ephemeral_id": "1fdf894d-9ddc-4c11-bf25-1970712a0bd9", + "id": "d9b59d61-0816-4bba-a760-819d10d05da0", + "name": "docker-fleet-agent", + "type": "filebeat", + "version": "8.13.0" + }, + "data_stream": { + "dataset": "websocket.websocket", + "namespace": "ep", + "type": "logs" + }, + "ecs": { + "version": "8.0.0" + }, + "elastic_agent": { + "id": "d9b59d61-0816-4bba-a760-819d10d05da0", + "snapshot": false, + "version": "8.13.0" + }, + "event": { + "agent_id_status": "verified", + "dataset": "websocket.websocket", + "ingested": "2024-07-08T09:25:53Z" + }, + "input": { + "type": "websocket" + }, + "message": "{\"data\":\"testdata1\",\"id\":\"test1234567891\",\"ts\":\"2024-01-01T01:00:00.000000-00:00\"}", + "tags": [ + "forwarded" + ] +} \ No newline at end of file diff --git a/packages/websocket/validation.yml b/packages/websocket/validation.yml new file mode 100644 index 00000000000..a96151416a6 --- /dev/null +++ b/packages/websocket/validation.yml @@ -0,0 +1,3 @@ +errors: + exclude_checks: + - SVR00005 # Kibana version for saved tags.