From d48c5940adc08cb802dfd4630d92e161cc7db3e6 Mon Sep 17 00:00:00 2001 From: dmachard <5562930+dmachard@users.noreply.github.com> Date: Sat, 21 Dec 2024 09:29:27 +0100 Subject: [PATCH 1/2] feat(transformer): new rreordering transform to sort logs by timestamp --- pkgconfig/transformers.go | 5 ++ transformers/reordering.go | 117 ++++++++++++++++++++++++++++++++ transformers/reordering_test.go | 70 +++++++++++++++++++ transformers/transformers.go | 1 + 4 files changed, 193 insertions(+) create mode 100644 transformers/reordering.go create mode 100644 transformers/reordering_test.go diff --git a/pkgconfig/transformers.go b/pkgconfig/transformers.go index 52823d80..4bdd04da 100644 --- a/pkgconfig/transformers.go +++ b/pkgconfig/transformers.go @@ -103,6 +103,11 @@ type ConfigTransformers struct { WhiteDomainsFile string `yaml:"white-domains-file" default:""` PersistenceFile string `yaml:"persistence-file" default:""` } `yaml:"new-domain-tracker"` + Reordering struct { + Enable bool `yaml:"enable" default:"false"` + FlushInterval int `yaml:"flush-interval" default:"30"` + MaxBufferSize int `yaml:"max-buffer-size" default:"100"` + } `yaml:"reordering"` } func (c *ConfigTransformers) SetDefault() { diff --git a/transformers/reordering.go b/transformers/reordering.go new file mode 100644 index 00000000..ea020292 --- /dev/null +++ b/transformers/reordering.go @@ -0,0 +1,117 @@ +package transformers + +import ( + "sort" + "sync" + "time" + + "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-logger" +) + +type ReorderingTransform struct { + GenericTransformer + buffer []dnsutils.DNSMessage + mutex sync.Mutex + flushTicker *time.Ticker + stopChan chan struct{} + nextWorkers []chan dnsutils.DNSMessage +} + +// NewLogReorderTransform creates an instance of the transformer. +func NewReorderingTransform(config *pkgconfig.ConfigTransformers, logger *logger.Logger, name string, instance int, nextWorkers []chan dnsutils.DNSMessage) *ReorderingTransform { + t := &ReorderingTransform{ + GenericTransformer: NewTransformer(config, logger, "reordering", name, instance, nextWorkers), + stopChan: make(chan struct{}), + nextWorkers: nextWorkers, + } + + return t +} + +// GetTransforms returns the available subtransformations. +func (t *ReorderingTransform) GetTransforms() ([]Subtransform, error) { + subtransforms := []Subtransform{} + if t.config.Reordering.Enable { + subtransforms = append(subtransforms, Subtransform{name: "reordering:sort-by-timestamp", processFunc: t.ReorderLogs}) + // Start a goroutine to handle periodic flushing. + t.flushTicker = time.NewTicker(time.Duration(t.config.Reordering.FlushInterval) * time.Second) + t.buffer = make([]dnsutils.DNSMessage, t.config.Reordering.MaxBufferSize) + go t.flushPeriodically() + + } + return subtransforms, nil +} + +// ReorderLogs adds a log to the buffer and flushes if the buffer is full. +func (t *ReorderingTransform) ReorderLogs(dm *dnsutils.DNSMessage) (int, error) { + t.mutex.Lock() + defer t.mutex.Unlock() + + // Add the log to the buffer. + t.buffer = append(t.buffer, *dm) + + // If the buffer exceeds a certain size, flush it. + const bufferSize = 100 + if len(t.buffer) >= bufferSize { + go t.flushBuffer() + } + + return ReturnKeep, nil +} + +// Close stops the periodic flushing. +func (t *ReorderingTransform) Reset() { + close(t.stopChan) +} + +// flushPeriodically periodically flushes the buffer based on a timer. +func (t *ReorderingTransform) flushPeriodically() { + for { + select { + case <-t.flushTicker.C: + t.flushBuffer() + case <-t.stopChan: + t.flushTicker.Stop() + return + } + } +} + +// flushBuffer sorts and sends the logs in the buffer to the next workers. +func (t *ReorderingTransform) flushBuffer() { + t.mutex.Lock() + defer t.mutex.Unlock() + + if len(t.buffer) == 0 { + return + } + + // Sort the buffer by timestamp. + sort.SliceStable(t.buffer, func(i, j int) bool { + ti, err1 := time.Parse(time.RFC3339Nano, t.buffer[i].DNSTap.TimestampRFC3339) + tj, err2 := time.Parse(time.RFC3339Nano, t.buffer[j].DNSTap.TimestampRFC3339) + if err1 != nil || err2 != nil { + // If timestamps are invalid, maintain the original order. + return false + } + return ti.Before(tj) + }) + + // Send sorted logs to the next workers. + for _, sortedMsg := range t.buffer { + for _, worker := range t.nextWorkers { + // Non-blocking send to avoid worker congestion. + select { + case worker <- sortedMsg: + default: + // Log or handle if the worker channel is full. + t.logger.Info("Worker channel is full, dropping message") + } + } + } + + // Clear the buffer. + t.buffer = t.buffer[:0] +} diff --git a/transformers/reordering_test.go b/transformers/reordering_test.go new file mode 100644 index 00000000..87d7853f --- /dev/null +++ b/transformers/reordering_test.go @@ -0,0 +1,70 @@ +package transformers + +import ( + "sort" + "testing" + + "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-logger" +) + +func TestReorderingTransform_SortByTimestamp(t *testing.T) { + // enable feature + config := pkgconfig.GetFakeConfigTransformers() + config.Reordering.Enable = true + + // initialize logger + log := logger.New(false) + + // create output channels + outChans := []chan dnsutils.DNSMessage{ + make(chan dnsutils.DNSMessage, 10), + } + + // initialize transformer + reorder := NewReorderingTransform(config, log, "test", 0, outChans) + + dm1 := dnsutils.GetFakeDNSMessage() + dm1.DNSTap.TimestampRFC3339 = "2024-12-20T21:12:14.786109Z" + + dm2 := dnsutils.GetFakeDNSMessage() + dm2.DNSTap.TimestampRFC3339 = "2024-12-20T21:12:14.766361Z" + + dm3 := dnsutils.GetFakeDNSMessage() + dm3.DNSTap.TimestampRFC3339 = "2024-12-20T21:12:14.803447Z" + + reorder.ReorderLogs(&dm1) + reorder.ReorderLogs(&dm2) + reorder.ReorderLogs(&dm3) + + // manually trigger a buffer flush + reorder.flushBuffer() + + // collect results from the output channel + var results []dnsutils.DNSMessage + done := false + for !done { + select { + case msg := <-outChans[0]: + results = append(results, msg) + default: + done = true + } + } + + // validate order + if len(results) != 3 { + t.Fatalf("Expected 3 messages, got %d", len(results)) + } + + timestamps := []string{ + results[0].DNSTap.TimestampRFC3339, + results[1].DNSTap.TimestampRFC3339, + results[2].DNSTap.TimestampRFC3339, + } + + if !sort.StringsAreSorted(timestamps) { + t.Errorf("Timestamps are not sorted: %v", timestamps) + } +} diff --git a/transformers/transformers.go b/transformers/transformers.go index 808bcc67..949851d5 100644 --- a/transformers/transformers.go +++ b/transformers/transformers.go @@ -87,6 +87,7 @@ func NewTransforms(config *pkgconfig.ConfigTransformers, logger *logger.Logger, d.availableTransforms = append(d.availableTransforms, TransformEntry{NewDNSGeoIPTransform(config, logger, name, instance, nextWorkers)}) d.availableTransforms = append(d.availableTransforms, TransformEntry{NewRewriteTransform(config, logger, name, instance, nextWorkers)}) d.availableTransforms = append(d.availableTransforms, TransformEntry{NewNewDomainTrackerTransform(config, logger, name, instance, nextWorkers)}) + d.availableTransforms = append(d.availableTransforms, TransformEntry{NewReorderingTransform(config, logger, name, instance, nextWorkers)}) d.Prepare() return d From 6dcb49a5421bf03c93e9c6db0c92ea6f56045849 Mon Sep 17 00:00:00 2001 From: dmachard <5562930+dmachard@users.noreply.github.com> Date: Sat, 21 Dec 2024 10:26:23 +0100 Subject: [PATCH 2/2] add docs --- README.md | 5 ++-- docs/transformers/transform_reordering.md | 20 ++++++++++++++++ transformers/reordering.go | 28 +++++++++++++++-------- 3 files changed, 41 insertions(+), 12 deletions(-) create mode 100644 docs/transformers/transform_reordering.md diff --git a/README.md b/README.md index f125b1f1..b2f5111a 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@
-
+
-
+
@@ -86,6 +86,7 @@ - Add [Geographical](docs/transformers/transform_geoip.md) metadata - Various data [Extractor](docs/transformers/transform_dataextractor.md) - Suspicious traffic [Detector](docs/transformers/transform_suspiciousdetector.md) and [Prediction](docs/transformers/transform_trafficprediction.md) + - [Reordering](docs/transformers/transform_reordering.md) DNS messages based on timestamps ## Get Started diff --git a/docs/transformers/transform_reordering.md b/docs/transformers/transform_reordering.md new file mode 100644 index 00000000..97104cf0 --- /dev/null +++ b/docs/transformers/transform_reordering.md @@ -0,0 +1,20 @@ +# Transformer: Reordering + +Use this transformer to reorder DNS messages based on their timestamp. This can be useful when processing logs that may not be ordered correctly, ensuring they are sorted before further processing. + +The transformer buffers DNS messages and periodically flushes them based on a configurable interval. The messages are sorted by timestamp before being passed to the next workers. + +Options: + +* `flush-interval` (int) + > Defines the interval (in seconds) at which the buffer will be flushed automatically. A smaller value will lead to more frequent flushing. + +* `max-buffer-size` (int) + > Defines the maximum number of messages that can be buffered before the transformer triggers a flush. Once this limit is reached, the buffer will be flushed regardless of the flush interval. + +```yaml +transforms: + reordering: + flush-interval: 30 + max-buffer-size: 100 +``` \ No newline at end of file diff --git a/transformers/reordering.go b/transformers/reordering.go index ea020292..a2fb8659 100644 --- a/transformers/reordering.go +++ b/transformers/reordering.go @@ -15,6 +15,7 @@ type ReorderingTransform struct { buffer []dnsutils.DNSMessage mutex sync.Mutex flushTicker *time.Ticker + flushSignal chan struct{} stopChan chan struct{} nextWorkers []chan dnsutils.DNSMessage } @@ -24,6 +25,7 @@ func NewReorderingTransform(config *pkgconfig.ConfigTransformers, logger *logger t := &ReorderingTransform{ GenericTransformer: NewTransformer(config, logger, "reordering", name, instance, nextWorkers), stopChan: make(chan struct{}), + flushSignal: make(chan struct{}), nextWorkers: nextWorkers, } @@ -37,7 +39,7 @@ func (t *ReorderingTransform) GetTransforms() ([]Subtransform, error) { subtransforms = append(subtransforms, Subtransform{name: "reordering:sort-by-timestamp", processFunc: t.ReorderLogs}) // Start a goroutine to handle periodic flushing. t.flushTicker = time.NewTicker(time.Duration(t.config.Reordering.FlushInterval) * time.Second) - t.buffer = make([]dnsutils.DNSMessage, t.config.Reordering.MaxBufferSize) + t.buffer = make([]dnsutils.DNSMessage, 0) go t.flushPeriodically() } @@ -46,24 +48,28 @@ func (t *ReorderingTransform) GetTransforms() ([]Subtransform, error) { // ReorderLogs adds a log to the buffer and flushes if the buffer is full. func (t *ReorderingTransform) ReorderLogs(dm *dnsutils.DNSMessage) (int, error) { - t.mutex.Lock() - defer t.mutex.Unlock() - // Add the log to the buffer. + t.mutex.Lock() t.buffer = append(t.buffer, *dm) - + t.mutex.Unlock() // If the buffer exceeds a certain size, flush it. - const bufferSize = 100 - if len(t.buffer) >= bufferSize { - go t.flushBuffer() + if len(t.buffer) >= t.config.Reordering.MaxBufferSize { + select { + case t.flushSignal <- struct{}{}: + default: + } } - return ReturnKeep, nil + return ReturnDrop, nil } // Close stops the periodic flushing. func (t *ReorderingTransform) Reset() { - close(t.stopChan) + select { + case <-t.stopChan: + default: + close(t.stopChan) + } } // flushPeriodically periodically flushes the buffer based on a timer. @@ -72,6 +78,8 @@ func (t *ReorderingTransform) flushPeriodically() { select { case <-t.flushTicker.C: t.flushBuffer() + case <-t.flushSignal: + t.flushBuffer() case <-t.stopChan: t.flushTicker.Stop() return