Skip to content

Commit

Permalink
Merge pull request #32 from ti-mo/sink-elastic-upsert
Browse files Browse the repository at this point in the history
elasticsearch - change from sampling to upserting single flow documents
  • Loading branch information
ti-mo authored Apr 20, 2020
2 parents 8c74a2c + 0f59ee9 commit 4ff121a
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 47 deletions.
10 changes: 5 additions & 5 deletions bpf/acct.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
struct acct_event_t {
u64 start;
u64 ts;
u32 cid;
u32 connmark;
u64 cptr;
union nf_inet_addr srcaddr;
union nf_inet_addr dstaddr;
u64 packets_orig;
u64 bytes_orig;
u64 packets_ret;
u64 bytes_ret;
u32 connmark;
u32 netns;
u16 srcport;
u16 dstport;
u32 netns;
u8 proto;
};

Expand Down Expand Up @@ -390,7 +390,7 @@ int kretprobe____nf_ct_refresh_acct(struct pt_regs *ctx) {
struct acct_event_t data = {
.start = 0,
.ts = ts,
.cid = (u32)ct,
.cptr = (u64)ct,
};

// Pull counters onto the BPF stack first, so that we can make event rate
Expand Down Expand Up @@ -452,7 +452,7 @@ int kprobe__nf_conntrack_free(struct pt_regs *ctx) {
struct acct_event_t data = {
.start = 0,
.ts = ts,
.cid = (u32)ct,
.cptr = (u64)ct,
};

// Ignore the event if the nf_conn doesn't contain counters.
Expand Down
12 changes: 6 additions & 6 deletions configs/conntracct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ sinks:

elastic:
type: elastic
address: "http://localhost:9200" # comma-separated list of elasticsearch server URLs
# database: conntracct # (default) index prefix (results in 'conntracct-YYYY.MM.DD')
# shards: 3 # (default) amount of shards to use for indices
# replicas: 0 # (default) amount of replicas to use for indices
# username: my-username # basic HTTP auth username
# password: my-password # basic HTTP auth password
address: "http://localhost:9200" # comma-separated list of elasticsearch server URLs
# database: conntracct-<hostname> # (default) index name
# shards: 3 # (default) index into this many shards
# replicas: 0 # (default) index into this many replicas
# username: my-username # basic HTTP auth username
# password: my-password # basic HTTP auth password

stdout:
type: stdout
Expand Down
18 changes: 11 additions & 7 deletions internal/sinks/elasticsearch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package elasticsearch

import (
"context"
"fmt"
"time"
"strconv"

elastic "github.com/olivere/elastic/v7"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -68,16 +67,21 @@ func (s *ElasticSink) flushBatch() {
// backend in a single bulk transaction.
func (s *ElasticSink) sendBatch(b batch) {

// Append current date to the configured database name.
idx := fmt.Sprintf("%s-%s", s.config.Database, time.Now().Format("2006.01.02"))

// Create an elastic bulk request.
bulk := s.client.Bulk().Index(idx)
bulk := s.client.Bulk().Index(s.config.Database)

// Create index requests for each event in the batch.
reqs := make([]elastic.BulkableRequest, 0, len(b))
for _, e := range b {
reqs = append(reqs, elastic.NewBulkIndexRequest().Doc(e))
reqs = append(reqs,
elastic.NewBulkUpdateRequest().
// Use ES as a latest value store, update the flow's document
// with the latest counters on each incoming event.
DocAsUpsert(true).
Id(strconv.FormatUint(uint64(e.FlowID), 10)).
Doc(e).
RetryOnConflict(1),
)
}

// Add all index requests to the bulk request.
Expand Down
8 changes: 4 additions & 4 deletions internal/sinks/elasticsearch/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (s *ElasticSink) PushUpdate(e bpf.Event) {

// Wrap the BPF event in a structure to be inserted into the database.
ee := event{
EventType: "update",
Event: &e,
State: "established",
Event: &e,
}

s.transformEvent(&ee)
Expand All @@ -111,8 +111,8 @@ func (s *ElasticSink) PushDestroy(e bpf.Event) {

// Wrap the BPF event in a structure to be inserted into the database.
ee := event{
EventType: "destroy",
Event: &e,
State: "finished",
Event: &e,
}

s.transformEvent(&ee)
Expand Down
21 changes: 15 additions & 6 deletions internal/sinks/elasticsearch/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
// This structure is used to generate the JSON document
// sent to elasticsearch.
type event struct {
// Type of event, eg. 'update' or 'destroy'.
EventType string `json:"event_type"`
// State of the flow, eg. 'established' or 'finished'.
State string `json:"flow_state"`

// Hostname of the machine sending the event.
Hostname string `json:"hostname"`
Expand All @@ -36,14 +36,23 @@ func (s *ElasticSink) transformEvent(e *event) {
// TODO(timo): Allow the user to override the hostname.
e.Hostname, _ = os.Hostname()

// Convert the flow start timestamp to milliseconds.
// Apply boot time offset to the (relative) event timestamp, convert to milliseconds.
// Nanosecond-resolution unix timestamps cannot be ingested by elastic.
// https://github.com/elastic/elasticsearch/issues/43917
e.Start = e.Start / uint64(time.Millisecond)

// Apply boot time offset to the (relative) event timestamp, convert to milliseconds.
e.Timestamp = uint64(boottime.Absolute(int64(e.Timestamp)) / int64(time.Millisecond))

// Flows' start timestamps are only generated when the kernel marks them
// as 'CONFIRMED'. The first event will come with a zero start timestamp.
// For elastic, use the event's timestamp as an approximate start time
// since later events will upsert the actual start timestamp anyway.
// Under normal conditions, this should only be a couple microseconds off.
if e.Start == 0 {
e.Start = e.Timestamp
} else {
// Convert the flow start timestamp to milliseconds.
e.Start = e.Start / uint64(time.Millisecond)
}

// Calculated fields.
e.PacketsTotal = e.PacketsOrig + e.PacketsRet
e.BytesTotal = e.BytesOrig + e.BytesRet
Expand Down
7 changes: 6 additions & 1 deletion internal/sinks/elasticsearch/helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package elasticsearch

import (
"os"
"strings"

"github.com/ti-mo/conntracct/internal/config"
Expand All @@ -17,7 +18,11 @@ func sinkDefaults(sc *config.SinkConfig) {
}

if sc.Database == "" {
sc.Database = "conntracct"
h, err := os.Hostname()
if err != nil {
panic(err)
}
sc.Database = "conntracct-" + h
}

if sc.BatchSize == 0 {
Expand Down
19 changes: 9 additions & 10 deletions pkg/bpf/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Event struct {
NetNS uint32 `json:"netns"`
Proto uint8 `json:"proto"`

connectionID uint32
connPtr uint64
}

// unmarshalBinary unmarshals a slice of bytes received from the
Expand All @@ -50,8 +50,7 @@ func (e *Event) unmarshalBinary(b []byte) error {

e.Start = *(*uint64)(unsafe.Pointer(&b[0]))
e.Timestamp = *(*uint64)(unsafe.Pointer(&b[8]))
e.connectionID = *(*uint32)(unsafe.Pointer(&b[16]))
e.Connmark = *(*uint32)(unsafe.Pointer(&b[20]))
e.connPtr = *(*uint64)(unsafe.Pointer(&b[16]))

// Build an IPv4 address if only the first four bytes
// of the nf_inet_addr union are filled.
Expand All @@ -74,13 +73,14 @@ func (e *Event) unmarshalBinary(b []byte) error {
e.PacketsRet = *(*uint64)(unsafe.Pointer(&b[72]))
e.BytesRet = *(*uint64)(unsafe.Pointer(&b[80]))

e.Connmark = *(*uint32)(unsafe.Pointer(&b[88]))
e.NetNS = *(*uint32)(unsafe.Pointer(&b[92]))

// Only extract ports for UDP and TCP.
e.Proto = b[96]
e.Proto = b[100]
if e.Proto == 6 || e.Proto == 17 {
e.SrcPort = binary.BigEndian.Uint16(b[88:90])
e.DstPort = binary.BigEndian.Uint16(b[90:92])
e.SrcPort = binary.BigEndian.Uint16(b[96:98])
e.DstPort = binary.BigEndian.Uint16(b[98:100])
}

// Generate and set the Event's FlowID.
Expand Down Expand Up @@ -113,10 +113,9 @@ func (e *Event) hashFlow() uint32 {
// Protocol.
_, _ = h.Write([]byte{e.Proto})

b = make([]byte, 4)

// Connection ID.
binary.BigEndian.PutUint32(b, e.connectionID)
// nf_conn struct kernel pointer.
b = make([]byte, 8)
binary.LittleEndian.PutUint64(b, e.connPtr)
_, _ = h.Write(b)

// Calculate the hash.
Expand Down
14 changes: 7 additions & 7 deletions pkg/bpf/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
func TestHashFlow(t *testing.T) {

e := Event{
SrcAddr: net.ParseIP("1.2.3.4"),
DstAddr: net.ParseIP("5.6.7.8"),
SrcPort: 1234,
DstPort: 5678,
Proto: 6,
connectionID: 11111111,
SrcAddr: net.ParseIP("1.2.3.4"),
DstAddr: net.ParseIP("5.6.7.8"),
SrcPort: 1234,
DstPort: 5678,
Proto: 6,
connPtr: 11111111111111111111,
}

assert.Equal(t, uint32(0x24846ab8), e.hashFlow())
assert.Equal(t, uint32(0x97c684), e.hashFlow())
}
2 changes: 1 addition & 1 deletion pkg/bpf/statik.go

Large diffs are not rendered by default.

0 comments on commit 4ff121a

Please # to comment.