Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

websocket: new generic integration #9926

Merged
merged 10 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@
/packages/universal_profiling_symbolizer @elastic/profiling
/packages/vectra_detect @elastic/security-service-integrations
/packages/vsphere @elastic/obs-infraobs-integrations
/packages/websocket @elastic/security-service-integrations
/packages/websphere_application_server @elastic/obs-infraobs-integrations
/packages/windows @elastic/elastic-agent-data-plane @elastic/sec-windows-platform
/packages/windows/data_stream/applocker_exe_and_dll @elastic/sec-windows-platform
Expand Down
4 changes: 4 additions & 0 deletions packages/websocket/_dev/build/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies:
ecs:
reference: git@v8.11.0
import_mappings: true
34 changes: 34 additions & 0 deletions packages/websocket/_dev/build/docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# WebSocket Input Integration
kcreddy marked this conversation as resolved.
Show resolved Hide resolved

The WebSocket input integration enables ingestion of real-time data from WebSocket servers. WebSockets provide a full-duplex communication channel over a single, long-lived connection, which makes it suitable for scenarios where low latency data transmission is required.

This input type connects to a WebSocket URL, listens for messages sent by the server, and processes these messages as they arrive. The data is then published to Elasticsearch, making it searchable and analyzable in near real-time.

## Configuration

The full documentation for configuring the WebSocket input can be found [here](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html).

To configure the WebSocket input, specify the connection URL and other optional parameters such as headers for authentication or protocol versions. Advanced options for connection handling, such as timeouts and subprotocols, can be configured in the "Advanced options" section.

### Example Configuration

Here is a basic example of how to configure the WebSocket input:

```yaml
- type: websocket
url: "ws://websocket-server.example.com/stream"
headers:
Cookie: "session_id=abcdef1234567890"
```
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

This configuration creates a WebSocket connection to ws://websocket-server.example.com/stream, using a session cookie for authentication.

## Data Processing

The WebSocket input will consume messages from the server as they are transmitted. These messages are expected to be in a format that Filebeat can process, such as JSON. If the message format is different, you may need to define a processor to parse and structure the data before it is sent to Elasticsearch.

**NOTE**: The websocket input as of now does not support XML messages.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

## Connection Management

The WebSocket input manages the connection to the WebSocket server, including automatically reconnecting if the connection is lost. The input does not maintain any state between restarts, so if the server sends historical data, it will be re-ingested upon reconnection.
11 changes: 11 additions & 0 deletions packages/websocket/_dev/deploy/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: "2.3"
services:
websocket-mock-service:
image: golang:1.21-alpine
hostname: websocket
working_dir: /app
volumes:
- ./websocket-mock-service:/app
ports:
- "3000:3000"
command: ["go", "run", "main.go"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module websocket-mock-service

go 1.21.3

require github.com/gorilla/websocket v1.5.1

require golang.org/x/net v0.17.0 // indirect
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please go fmt this code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reverse the order of function declarations. The convention in Go code is caller before callee.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

import (
"log"
"net/http"
"github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
defer conn.Close()

// Send a message to the client upon successful WebSocket connection
err = conn.WriteJSON(map[string]string{
"message": "You are now connected to the WebSocket server.",
})
if err != nil {
log.Println("write:", err)
return
}
}

func main() {
http.HandleFunc("/", handleWebSocket)
log.Fatal(http.ListenAndServe(":3000", nil))
}
8 changes: 8 additions & 0 deletions packages/websocket/_dev/test/system/test-default-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
input: websocket
service: websocket-mock-service
vars:
url: ws://{{Hostname}}:{{Port}}
program: |
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
bytes(state.response).decode_json().as(inner_body,{
"events": inner_body,
})
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
24 changes: 24 additions & 0 deletions packages/websocket/agent/input/websocket.yml.hbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
data_stream:
dataset: {{data_stream.dataset}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
url: {{url}}

program: {{escape_string program}}

{{#if pipeline}}
pipeline: {{pipeline}}
{{/if}}

kcreddy marked this conversation as resolved.
Show resolved Hide resolved
tags:
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}
6 changes: 6 additions & 0 deletions packages/websocket/changelog.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# newer versions go on top
- version: "0.1.0"
changes:
- description: Initial Implementation.
type: enhancement
link: https://github.com/elastic/integrations/pull/9926
34 changes: 34 additions & 0 deletions packages/websocket/docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# WebSocket Input Integration

The WebSocket input integration enables ingestion of real-time data from WebSocket servers. WebSockets provide a full-duplex communication channel over a single, long-lived connection, which makes it suitable for scenarios where low latency data transmission is required.

This input type connects to a WebSocket URL, listens for messages sent by the server, and processes these messages as they arrive. The data is then published to Elasticsearch, making it searchable and analyzable in near real-time.

## Configuration

The full documentation for configuring the WebSocket input can be found [here](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html).

To configure the WebSocket input, specify the connection URL and other optional parameters such as headers for authentication or protocol versions. Advanced options for connection handling, such as timeouts and subprotocols, can be configured in the "Advanced options" section.

### Example Configuration

Here is a basic example of how to configure the WebSocket input:

```yaml
- type: websocket
url: "ws://websocket-server.example.com/stream"
headers:
Cookie: "session_id=abcdef1234567890"
```

This configuration creates a WebSocket connection to ws://websocket-server.example.com/stream, using a session cookie for authentication.

## Data Processing

The WebSocket input will consume messages from the server as they are transmitted. These messages are expected to be in a format that Filebeat can process, such as JSON. If the message format is different, you may need to define a processor to parse and structure the data before it is sent to Elasticsearch.

**NOTE**: The websocket input as of now does not support XML messages.

## Connection Management

The WebSocket input manages the connection to the WebSocket server, including automatically reconnecting if the connection is lost. The input does not maintain any state between restarts, so if the server sends historical data, it will be re-ingested upon reconnection.
16 changes: 16 additions & 0 deletions packages/websocket/fields/base-fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
- name: data_stream.type
type: constant_keyword
description: Data stream type.
- name: data_stream.dataset
type: constant_keyword
description: Data stream dataset.
- name: data_stream.namespace
type: constant_keyword
description: Data stream namespace.
- name: event.module
type: constant_keyword
description: Event module.
value: websocket
- name: '@timestamp'
type: date
description: Event timestamp.
9 changes: 9 additions & 0 deletions packages/websocket/fields/beats.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- name: input.type
type: keyword
description: Type of filebeat input.
- name: log.offset
type: long
description: Log offset.
- name: tags
type: keyword
description: User defined tags.
83 changes: 83 additions & 0 deletions packages/websocket/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
format_version: 3.0.2
name: websocket
title: Custom input using Websocket
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
version: 0.1.0
description: Collect custom events from a socket server with Elastic agent.
type: input
categories:
- custom
conditions:
kibana:
version: "^8.13.0"
elastic:
subscription: "basic"
policy_templates:
- name: websocket
title: Websocket Custom Socket Input
description: Collect custom data from websocket server.
input: websocket
type: logs
template_path: websocket.yml.hbs
vars:
- name: data_stream.dataset
type: text
title: Dataset name
description: |
Dataset to write data to. Changing the dataset will send the data to a different index. You can't use `-` in the name of a dataset and only valid characters for [Elasticsearch index names](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html).
default: websocket.websocket
required: true
show_user: true
- name: url
type: text
title: URL
description: URL of the Websocket Server.
required: true
show_user: true
default: ws://localhost:443/v1/stream
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
- name: pipeline
type: text
title: Ingest Pipeline
description: |
The Ingest Node pipeline ID to be used by the integration.
required: false
show_user: true
- name: program
type: textarea
title: The CEL program to be run for each message.
description: |
Program is the The CEL program that is executed on each message received.
More information can be found in the [documentation](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html#_execution_2).
show_user: true
multi: false
required: true
default: |
# // Read and process simple websocket messages from a local websocket server.
# bytes(state.response).decode_json().as(inner_body,{
# "events": inner_body.encode_json(),
# })
- name: ssl
type: yaml
title: SSL Configuration
description: i.e. certificate_authorities, supported_protocols, verification_mode etc, more examples found in the [documentation](https://www.elastic.co/guide/en/beats/filebeat/current/configuration-ssl.html#ssl-common-config).
multi: false
required: false
show_user: false
- name: processors
type: yaml
title: Processors
multi: false
required: false
show_user: false
description: >
Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/beats/filebeat/current/filtering-and-enhancing-data.html) for details.

- name: tags
type: text
title: Tags
multi: true
show_user: false
default:
- forwarded
owner:
github: elastic/security-service-integrations
type: elastic
35 changes: 35 additions & 0 deletions packages/websocket/sample_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"@timestamp": "2024-05-21T07:38:04.940Z",
"agent": {
"ephemeral_id": "5fbf1aac-060f-4922-af3d-ce533172df31",
"id": "1d01535d-afdc-48ae-92f9-e36340b184ad",
"name": "docker-fleet-agent",
"type": "filebeat",
"version": "8.13.0"
},
"data_stream": {
"dataset": "websocket.websocket",
"namespace": "ep",
"type": "logs"
},
"ecs": {
"version": "8.0.0"
},
"elastic_agent": {
"id": "1d01535d-afdc-48ae-92f9-e36340b184ad",
"snapshot": false,
"version": "8.13.0"
},
"event": {
"agent_id_status": "verified",
"dataset": "websocket.websocket",
"ingested": "2024-05-21T07:38:16Z"
},
"input": {
"type": "websocket"
},
"message": "You are now connected to the WebSocket server.",
"tags": [
"forwarded"
]
}
3 changes: 3 additions & 0 deletions packages/websocket/validation.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
errors:
exclude_checks:
- SVR00005 # Kibana version for saved tags.