diff --git a/cmd/filter/config.go b/cmd/filter/config.go index ab99965a1..63066cde4 100644 --- a/cmd/filter/config.go +++ b/cmd/filter/config.go @@ -30,6 +30,8 @@ type filterConfig struct { DropMetricsTTL string `yaml:"drop_metrics_ttl"` // Flags for compatibility with different graphite behaviours Compatibility compatibility `yaml:"graphite_compatibility"` + // Time after which the batch of metrics is forced to be saved, default is 1s + BatchForcedSaveTimeout string `yaml:"batch_forced_save_timeout"` } func getDefault() config { @@ -45,12 +47,13 @@ func getDefault() config { LogPrettyFormat: false, }, Filter: filterConfig{ - Listen: ":2003", - RetentionConfig: "/etc/moira/storage-schemas.conf", - CacheCapacity: 10, //nolint - MaxParallelMatches: 0, - PatternsUpdatePeriod: "1s", - DropMetricsTTL: "1h", + Listen: ":2003", + RetentionConfig: "/etc/moira/storage-schemas.conf", + CacheCapacity: 10, //nolint + MaxParallelMatches: 0, + PatternsUpdatePeriod: "1s", + DropMetricsTTL: "1h", + BatchForcedSaveTimeout: "1s", Compatibility: compatibility{ AllowRegexLooseStartMatch: false, AllowRegexMatchEmpty: true, diff --git a/cmd/filter/main.go b/cmd/filter/main.go index 42d1a9134..b4624302d 100644 --- a/cmd/filter/main.go +++ b/cmd/filter/main.go @@ -143,7 +143,8 @@ func main() { // Start metrics matcher cacheCapacity := config.Filter.CacheCapacity - metricsMatcher := matchedmetrics.NewMetricsMatcher(filterMetrics, logger, database, cacheStorage, cacheCapacity) + batchForcedSaveTimeout := to.Duration(config.Filter.BatchForcedSaveTimeout) + metricsMatcher := matchedmetrics.NewMetricsMatcher(filterMetrics, logger, database, cacheStorage, cacheCapacity, batchForcedSaveTimeout) metricsMatcher.Start(metricsChan) defer metricsMatcher.Wait() // First stop listener defer stopListener(listener) // Then waiting for metrics matcher handle all received events diff --git a/filter/matched_metrics/metrics.go b/filter/matched_metrics/metrics.go index 9e355261b..c3a3f9508 100644 --- a/filter/matched_metrics/metrics.go +++ b/filter/matched_metrics/metrics.go @@ -12,13 +12,14 @@ import ( // MetricsMatcher make buffer of metrics and save it. type MetricsMatcher struct { - logger moira.Logger - metrics *metrics.FilterMetrics - database moira.Database - cacheStorage *filter.Storage - cacheCapacity int - waitGroup *sync.WaitGroup - closeRequest chan struct{} + logger moira.Logger + metrics *metrics.FilterMetrics + database moira.Database + cacheStorage *filter.Storage + cacheCapacity int + waitGroup *sync.WaitGroup + closeRequest chan struct{} + batchForcedSaveTimeout time.Duration } // NewMetricsMatcher creates new MetricsMatcher. @@ -28,15 +29,17 @@ func NewMetricsMatcher( database moira.Database, cacheStorage *filter.Storage, cacheCapacity int, + batchForcedSaveTimeout time.Duration, ) *MetricsMatcher { return &MetricsMatcher{ - metrics: metrics, - logger: logger, - database: database, - cacheStorage: cacheStorage, - cacheCapacity: cacheCapacity, - waitGroup: &sync.WaitGroup{}, - closeRequest: make(chan struct{}), + metrics: metrics, + logger: logger, + database: database, + cacheStorage: cacheStorage, + cacheCapacity: cacheCapacity, + waitGroup: &sync.WaitGroup{}, + closeRequest: make(chan struct{}), + batchForcedSaveTimeout: batchForcedSaveTimeout, } } @@ -63,7 +66,7 @@ func (matcher *MetricsMatcher) receiveBatch(metrics <-chan *moira.MatchedMetric) go func() { defer close(batchedMetrics) - batchTimer := time.NewTimer(time.Second) + batchTimer := time.NewTimer(matcher.batchForcedSaveTimeout) defer batchTimer.Stop() for { batch := make(map[string]*moira.MatchedMetric, matcher.cacheCapacity)