diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 24989c8140..bc9d14cd4c 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -11,6 +11,7 @@ import ( "go.signoz.io/signoz/ee/query-service/interfaces" "go.signoz.io/signoz/ee/query-service/license" "go.signoz.io/signoz/ee/query-service/usage" + "go.signoz.io/signoz/pkg/alertmanager" baseapp "go.signoz.io/signoz/pkg/query-service/app" "go.signoz.io/signoz/pkg/query-service/app/cloudintegrations" "go.signoz.io/signoz/pkg/query-service/app/integrations" @@ -20,6 +21,7 @@ import ( basemodel "go.signoz.io/signoz/pkg/query-service/model" rules "go.signoz.io/signoz/pkg/query-service/rules" "go.signoz.io/signoz/pkg/query-service/version" + "go.signoz.io/signoz/pkg/signoz" "go.signoz.io/signoz/pkg/types/authtypes" ) @@ -51,7 +53,7 @@ type APIHandler struct { } // NewAPIHandler returns an APIHandler -func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { +func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, error) { baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{ Reader: opts.DataConnector, @@ -67,6 +69,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { FluxInterval: opts.FluxInterval, UseLogsNewSchema: opts.UseLogsNewSchema, UseTraceNewSchema: opts.UseTraceNewSchema, + AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager), }) if err != nil { diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 685c7910b5..aaf42d4f02 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -267,7 +267,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { JWT: serverOptions.Jwt, } - apiHandler, err := api.NewAPIHandler(apiOpts) + apiHandler, err := api.NewAPIHandler(apiOpts, serverOptions.SigNoz) if err != nil { return nil, err } diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 5fab8286c9..499034313c 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -7,7 +7,6 @@ import ( "os" "os/signal" "strconv" - "syscall" "time" "go.opentelemetry.io/otel/sdk/resource" @@ -198,16 +197,19 @@ func main() { zap.L().Fatal("Failed to initialize auth cache", zap.Error(err)) } - signalsChannel := make(chan os.Signal, 1) - signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) + signoz.Start(context.Background()) - for { - select { - case status := <-server.HealthCheckStatus(): - zap.L().Info("Received HealthCheck status: ", zap.Int("status", int(status))) - case <-signalsChannel: - zap.L().Fatal("Received OS Interrupt Signal ... ") - server.Stop() - } + if err := signoz.Wait(context.Background()); err != nil { + zap.L().Fatal("Failed to start signoz", zap.Error(err)) + } + + err = server.Stop() + if err != nil { + zap.L().Fatal("Failed to stop server", zap.Error(err)) + } + + err = signoz.Stop(context.Background()) + if err != nil { + zap.L().Fatal("Failed to stop signoz", zap.Error(err)) } } diff --git a/go.mod b/go.mod index 689109eba2..76198ea7c1 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,7 @@ require ( github.com/soheilhy/cmux v0.1.5 github.com/srikanthccv/ClickHouse-go-mock v0.9.0 github.com/stretchr/testify v1.10.0 + github.com/tidwall/gjson v1.18.0 github.com/uptrace/bun v1.2.9 github.com/uptrace/bun/dialect/pgdialect v1.2.9 github.com/uptrace/bun/dialect/sqlitedialect v1.2.9 @@ -204,6 +205,8 @@ require ( github.com/smarty/assertions v1.15.0 // indirect github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect diff --git a/go.sum b/go.sum index ad7dc839f5..d48dc7ecde 100644 --- a/go.sum +++ b/go.sum @@ -891,7 +891,13 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0= github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4= diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index a4c916fa9e..f1558ca629 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -26,6 +26,9 @@ type Alertmanager interface { // ListChannels lists all channels for the organization. ListChannels(context.Context, string) ([]*alertmanagertypes.Channel, error) + // ListAllChannels lists all channels for all organizations. It is used by the legacy alertmanager only. + ListAllChannels(context.Context) ([]*alertmanagertypes.Channel, error) + // GetChannelByID gets a channel for the organization. GetChannelByID(context.Context, string, int) (*alertmanagertypes.Channel, error) diff --git a/pkg/alertmanager/alertmanagerbatcher/batcher.go b/pkg/alertmanager/alertmanagerbatcher/batcher.go index 4ea8b624e7..a4474a81f8 100644 --- a/pkg/alertmanager/alertmanagerbatcher/batcher.go +++ b/pkg/alertmanager/alertmanagerbatcher/batcher.go @@ -55,7 +55,6 @@ func (batcher *Batcher) Start(ctx context.Context) error { go func() { defer batcher.goroutinesWg.Done() - batcher.logger.InfoContext(ctx, "starting alertmanager batcher") for { select { case <-batcher.stopC: @@ -106,7 +105,6 @@ func (batcher *Batcher) Add(ctx context.Context, alerts ...*alertmanagertypes.Po // Stop shuts down the batcher. func (batcher *Batcher) Stop(ctx context.Context) { - batcher.logger.InfoContext(ctx, "Stopping alertmanager batcher") close(batcher.stopC) batcher.goroutinesWg.Wait() } diff --git a/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go b/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go index da28a9f9df..da8223301f 100644 --- a/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go +++ b/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go @@ -224,3 +224,19 @@ func (store *config) ListChannels(ctx context.Context, orgID string) ([]*alertma return channels, nil } + +func (store *config) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) { + var channels []*alertmanagertypes.Channel + + err := store. + sqlstore. + BunDB(). + NewSelect(). + Model(&channels). + Scan(ctx) + if err != nil { + return nil, err + } + + return channels, nil +} diff --git a/pkg/alertmanager/api.go b/pkg/alertmanager/api.go index ccaf118aab..1f7e7bbaa7 100644 --- a/pkg/alertmanager/api.go +++ b/pkg/alertmanager/api.go @@ -24,7 +24,7 @@ func NewAPI(alertmanager Alertmanager) *API { } } -func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) { +func (api *API) GetAlerts(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -49,7 +49,7 @@ func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusOK, alerts) } -func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) { +func (api *API) TestReceiver(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -81,7 +81,7 @@ func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusNoContent, nil) } -func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) { +func (api *API) ListChannels(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -100,7 +100,20 @@ func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusOK, channels) } -func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) { +func (api *API) ListAllChannels(rw http.ResponseWriter, req *http.Request) { + ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) + defer cancel() + + channels, err := api.alertmanager.ListAllChannels(ctx) + if err != nil { + render.Error(rw, err) + return + } + + render.Success(rw, http.StatusOK, channels) +} + +func (api *API) GetChannelByID(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -137,7 +150,7 @@ func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusOK, channel) } -func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) { +func (api *API) UpdateChannelByID(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -187,7 +200,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusNoContent, nil) } -func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) { +func (api *API) DeleteChannelByID(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() @@ -224,7 +237,7 @@ func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) { render.Success(rw, http.StatusNoContent, nil) } -func (api *API) CreateChannel(req *http.Request, rw http.ResponseWriter) { +func (api *API) CreateChannel(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) defer cancel() diff --git a/pkg/alertmanager/config.go b/pkg/alertmanager/config.go index 40505045a2..861c707a8f 100644 --- a/pkg/alertmanager/config.go +++ b/pkg/alertmanager/config.go @@ -30,8 +30,8 @@ type Signoz struct { } type Legacy struct { - // URL is the URL of the legacy alertmanager. - URL string `mapstructure:"url"` + // ApiURL is the URL of the legacy signoz alertmanager. + ApiURL string `mapstructure:"api_url"` } func NewConfigFactory() factory.ConfigFactory { @@ -42,7 +42,7 @@ func newConfig() factory.Config { return Config{ Provider: "legacy", Legacy: Legacy{ - URL: "http://alertmanager:9093/api", + ApiURL: "http://alertmanager:9093/api", }, Signoz: Signoz{ PollInterval: 15 * time.Second, @@ -53,13 +53,13 @@ func newConfig() factory.Config { func (c Config) Validate() error { if c.Provider == "legacy" { - if c.Legacy.URL == "" { - return errors.New("url is required") + if c.Legacy.ApiURL == "" { + return errors.New("api_url is required") } - _, err := url.Parse(c.Legacy.URL) + _, err := url.Parse(c.Legacy.ApiURL) if err != nil { - return fmt.Errorf("url %q is invalid: %w", c.Legacy.URL, err) + return fmt.Errorf("api_url %q is invalid: %w", c.Legacy.ApiURL, err) } } diff --git a/pkg/alertmanager/config_test.go b/pkg/alertmanager/config_test.go index e92e703cea..435e92efb7 100644 --- a/pkg/alertmanager/config_test.go +++ b/pkg/alertmanager/config_test.go @@ -13,7 +13,7 @@ import ( func TestNewWithEnvProvider(t *testing.T) { t.Setenv("SIGNOZ_ALERTMANAGER_PROVIDER", "legacy") - t.Setenv("SIGNOZ_ALERTMANAGER_LEGACY_URL", "http://localhost:9093/api") + t.Setenv("SIGNOZ_ALERTMANAGER_LEGACY_API__URL", "http://localhost:9093/api") conf, err := config.New( context.Background(), @@ -38,7 +38,7 @@ func TestNewWithEnvProvider(t *testing.T) { expected := &Config{ Provider: "legacy", Legacy: Legacy{ - URL: "http://localhost:9093/api", + ApiURL: "http://localhost:9093/api", }, Signoz: def.Signoz, } diff --git a/pkg/alertmanager/legacyalertmanager/provider.go b/pkg/alertmanager/legacyalertmanager/provider.go index 9f988718f4..9d9e2427a3 100644 --- a/pkg/alertmanager/legacyalertmanager/provider.go +++ b/pkg/alertmanager/legacyalertmanager/provider.go @@ -10,6 +10,7 @@ import ( "net/url" "time" + "github.com/tidwall/gjson" "go.signoz.io/signoz/pkg/alertmanager" "go.signoz.io/signoz/pkg/alertmanager/alertmanagerbatcher" "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore" @@ -43,7 +44,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager") configStore := sqlalertmanagerstore.NewConfigStore(sqlstore) - url, err := url.Parse(config.Legacy.URL) + url, err := url.Parse(config.Legacy.ApiURL) if err != nil { return nil, err } @@ -96,8 +97,12 @@ func (provider *provider) GetAlerts(ctx context.Context, orgID string, params al return nil, err } + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("bad response status %v", resp.Status) + } + var alerts alertmanagertypes.GettableAlerts - if err := json.Unmarshal(body, &alerts); err != nil { + if err := json.Unmarshal([]byte(gjson.GetBytes(body, "data").Raw), &alerts); err != nil { return nil, err } @@ -171,6 +176,10 @@ func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*al return provider.configStore.ListChannels(ctx, orgID) } +func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) { + return provider.configStore.ListAllChannels(ctx) +} + func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) { return provider.configStore.GetChannelByID(ctx, orgID, channelID) } diff --git a/pkg/alertmanager/signozalertmanager/provider.go b/pkg/alertmanager/signozalertmanager/provider.go index 43b6ac95c9..105580a529 100644 --- a/pkg/alertmanager/signozalertmanager/provider.go +++ b/pkg/alertmanager/signozalertmanager/provider.go @@ -6,6 +6,7 @@ import ( "go.signoz.io/signoz/pkg/alertmanager" "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore" + "go.signoz.io/signoz/pkg/errors" "go.signoz.io/signoz/pkg/factory" "go.signoz.io/signoz/pkg/sqlstore" "go.signoz.io/signoz/pkg/types/alertmanagertypes" @@ -17,6 +18,7 @@ type provider struct { settings factory.ScopedProviderSettings configStore alertmanagertypes.ConfigStore stateStore alertmanagertypes.StateStore + stopC chan struct{} } func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] { @@ -42,6 +44,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config config: config, configStore: configStore, stateStore: stateStore, + stopC: make(chan struct{}), }, nil } @@ -50,7 +53,7 @@ func (provider *provider) Start(ctx context.Context) error { defer ticker.Stop() for { select { - case <-ctx.Done(): + case <-provider.stopC: return nil case <-ticker.C: if err := provider.service.SyncServers(ctx); err != nil { @@ -61,6 +64,7 @@ func (provider *provider) Start(ctx context.Context) error { } func (provider *provider) Stop(ctx context.Context) error { + close(provider.stopC) return provider.service.Stop(ctx) } @@ -91,6 +95,10 @@ func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*al return channelList, nil } +func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) { + return nil, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "not supported by provider signoz") +} + func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) { config, err := provider.configStore.Get(ctx, orgID) if err != nil { diff --git a/pkg/factory/registry.go b/pkg/factory/registry.go index 2eda44279b..be7d95c6d4 100644 --- a/pkg/factory/registry.go +++ b/pkg/factory/registry.go @@ -40,7 +40,7 @@ func NewRegistry(logger *slog.Logger, services ...NamedService) (*Registry, erro }, nil } -func (r *Registry) Start(ctx context.Context) error { +func (r *Registry) Start(ctx context.Context) { for _, s := range r.services.GetInOrder() { go func(s NamedService) { r.logger.InfoContext(ctx, "starting service", "service", s.Name()) @@ -49,7 +49,6 @@ func (r *Registry) Start(ctx context.Context) error { }(s) } - return nil } func (r *Registry) Wait(ctx context.Context) error { diff --git a/pkg/factory/registry_test.go b/pkg/factory/registry_test.go index 57a7b0df67..6d55a3bec2 100644 --- a/pkg/factory/registry_test.go +++ b/pkg/factory/registry_test.go @@ -41,7 +41,7 @@ func TestRegistryWith2Services(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - require.NoError(t, registry.Start(ctx)) + registry.Start(ctx) require.NoError(t, registry.Wait(ctx)) require.NoError(t, registry.Stop(ctx)) }() @@ -62,7 +62,7 @@ func TestRegistryWith2ServicesWithoutWait(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - require.NoError(t, registry.Start(ctx)) + registry.Start(ctx) require.NoError(t, registry.Stop(ctx)) }() diff --git a/pkg/http/middleware/response.go b/pkg/http/middleware/response.go index deb0f3dd81..8d6f4d068e 100644 --- a/pkg/http/middleware/response.go +++ b/pkg/http/middleware/response.go @@ -78,6 +78,12 @@ func (writer *nonFlushingBadResponseLoggingWriter) Write(data []byte) (int, erro // https://godoc.org/net/http#ResponseWriter writer.WriteHeader(http.StatusOK) } + + // 204 No Content is a success response that indicates that the request has been successfully processed and that the response body is intentionally empty. + if writer.statusCode == 204 { + return 0, nil + } + n, err := writer.rw.Write(data) if writer.logBody { writer.captureResponseBody(data) diff --git a/pkg/instrumentation/sdk.go b/pkg/instrumentation/sdk.go index f4059d3a6e..c31cf5796d 100644 --- a/pkg/instrumentation/sdk.go +++ b/pkg/instrumentation/sdk.go @@ -23,6 +23,7 @@ type SDK struct { logger *slog.Logger sdk contribsdkconfig.SDK prometheusRegistry *prometheus.Registry + startCh chan struct{} } // New creates a new Instrumentation instance with configured providers. @@ -96,14 +97,17 @@ func New(ctx context.Context, build version.Build, cfg Config) (*SDK, error) { sdk: sdk, prometheusRegistry: prometheusRegistry, logger: NewLogger(cfg), + startCh: make(chan struct{}), }, nil } func (i *SDK) Start(ctx context.Context) error { + <-i.startCh return nil } func (i *SDK) Stop(ctx context.Context) error { + close(i.startCh) return i.sdk.Shutdown(ctx) } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 4a9a62b9b1..b2ab2d0c32 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "go.signoz.io/signoz/pkg/query-service/app/metricsexplorer" "io" "math" "net/http" @@ -19,6 +18,9 @@ import ( "text/template" "time" + "go.signoz.io/signoz/pkg/alertmanager" + "go.signoz.io/signoz/pkg/query-service/app/metricsexplorer" + "github.com/gorilla/mux" "github.com/gorilla/websocket" jsoniter "github.com/json-iterator/go" @@ -58,7 +60,6 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/dao" - am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/rules" @@ -84,7 +85,6 @@ type APIHandler struct { reader interfaces.Reader skipConfig *model.SkipConfig appDao dao.ModelDao - alertManager am.Manager ruleManager *rules.Manager featureFlags interfaces.FeatureLookup querier interfaces.Querier @@ -133,6 +133,8 @@ type APIHandler struct { pvcsRepo *inframetrics.PvcsRepo JWT *authtypes.JWT + + AlertmanagerAPI *alertmanager.API } type APIHandlerOpts struct { @@ -174,16 +176,12 @@ type APIHandlerOpts struct { UseTraceNewSchema bool JWT *authtypes.JWT + + AlertmanagerAPI *alertmanager.API } // NewAPIHandler returns an APIHandler func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { - - alertManager, err := am.New() - if err != nil { - return nil, err - } - querierOpts := querier.QuerierOptions{ Reader: opts.Reader, Cache: opts.Cache, @@ -227,7 +225,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { skipConfig: opts.SkipConfig, preferSpanMetrics: opts.PreferSpanMetrics, temporalityMap: make(map[string]map[v3.Temporality]bool), - alertManager: alertManager, ruleManager: opts.RuleManager, featureFlags: opts.FeatureFlags, IntegrationsController: opts.IntegrationsController, @@ -250,6 +247,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { pvcsRepo: pvcsRepo, JWT: opts.JWT, SummaryService: summaryService, + AlertmanagerAPI: opts.AlertmanagerAPI, } logsQueryBuilder := logsv3.PrepareLogsQuery @@ -483,21 +481,21 @@ func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) { // RegisterPrivateRoutes registers routes for this handler on the given router func (aH *APIHandler) RegisterPrivateRoutes(router *mux.Router) { - router.HandleFunc("/api/v1/channels", aH.listChannels).Methods(http.MethodGet) + router.HandleFunc("/api/v1/channels", aH.AlertmanagerAPI.ListAllChannels).Methods(http.MethodGet) } // RegisterRoutes registers routes for this handler on the given router func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v1/query_range", am.ViewAccess(aH.queryRangeMetrics)).Methods(http.MethodGet) router.HandleFunc("/api/v1/query", am.ViewAccess(aH.queryMetrics)).Methods(http.MethodGet) - router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.listChannels)).Methods(http.MethodGet) - router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.getChannel)).Methods(http.MethodGet) - router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.editChannel)).Methods(http.MethodPut) - router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.deleteChannel)).Methods(http.MethodDelete) - router.HandleFunc("/api/v1/channels", am.EditAccess(aH.createChannel)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.testChannel)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/channels", am.ViewAccess(aH.AlertmanagerAPI.ListChannels)).Methods(http.MethodGet) + router.HandleFunc("/api/v1/channels/{id}", am.ViewAccess(aH.AlertmanagerAPI.GetChannelByID)).Methods(http.MethodGet) + router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.UpdateChannelByID)).Methods(http.MethodPut) + router.HandleFunc("/api/v1/channels/{id}", am.AdminAccess(aH.AlertmanagerAPI.DeleteChannelByID)).Methods(http.MethodDelete) + router.HandleFunc("/api/v1/channels", am.EditAccess(aH.AlertmanagerAPI.CreateChannel)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/testChannel", am.EditAccess(aH.AlertmanagerAPI.TestReceiver)).Methods(http.MethodPost) - router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.getAlerts)).Methods(http.MethodGet) + router.HandleFunc("/api/v1/alerts", am.ViewAccess(aH.AlertmanagerAPI.GetAlerts)).Methods(http.MethodGet) router.HandleFunc("/api/v1/rules", am.ViewAccess(aH.listRules)).Methods(http.MethodGet) router.HandleFunc("/api/v1/rules/{id}", am.ViewAccess(aH.getRule)).Methods(http.MethodGet) @@ -1360,138 +1358,6 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) { } -func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id) - if apiErrorObj != nil { - RespondError(w, apiErrorObj, nil) - return - } - aH.Respond(w, channel) -} - -func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id) - if apiErrorObj != nil { - RespondError(w, apiErrorObj, nil) - return - } - aH.Respond(w, "notification channel successfully deleted") -} - -func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) { - channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels() - if apiErrorObj != nil { - RespondError(w, apiErrorObj, nil) - return - } - aH.Respond(w, channels) -} - -// testChannels sends test alert to all registered channels -func (aH *APIHandler) testChannel(w http.ResponseWriter, r *http.Request) { - - defer r.Body.Close() - body, err := io.ReadAll(r.Body) - if err != nil { - zap.L().Error("Error in getting req body of testChannel API", zap.Error(err)) - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - - receiver := &am.Receiver{} - if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer - zap.L().Error("Error in parsing req body of testChannel API\n", zap.Error(err)) - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - // send alert - apiErrorObj := aH.alertManager.TestReceiver(receiver) - if apiErrorObj != nil { - RespondError(w, apiErrorObj, nil) - return - } - aH.Respond(w, "test alert sent") -} - -func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) { - - id := mux.Vars(r)["id"] - - defer r.Body.Close() - body, err := io.ReadAll(r.Body) - if err != nil { - zap.L().Error("Error in getting req body of editChannel API", zap.Error(err)) - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - - receiver := &am.Receiver{} - if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer - zap.L().Error("Error in parsing req body of editChannel API", zap.Error(err)) - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - - _, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id) - - if apiErrorObj != nil { - RespondError(w, apiErrorObj, nil) - return - } - - aH.Respond(w, nil) - -} - -func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) { - - defer r.Body.Close() - body, err := io.ReadAll(r.Body) - if err != nil { - zap.L().Error("Error in getting req body of createChannel API", zap.Error(err)) - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - - receiver := &am.Receiver{} - if err := json.Unmarshal(body, receiver); err != nil { // Parse []byte to go struct pointer - zap.L().Error("Error in parsing req body of createChannel API", zap.Error(err)) - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - - _, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver) - - if apiErrorObj != nil { - RespondError(w, apiErrorObj, nil) - return - } - - aH.Respond(w, nil) - -} - -func (aH *APIHandler) getAlerts(w http.ResponseWriter, r *http.Request) { - params := r.URL.Query() - amEndpoint := constants.GetAlertManagerApiPrefix() - resp, err := http.Get(amEndpoint + "v1/alerts" + "?" + params.Encode()) - if err != nil { - RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) - return - } - - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) - return - } - - aH.Respond(w, string(body)) -} - func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index b2e6a5ddf0..651b3a6a1a 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -14,6 +14,7 @@ import ( "github.com/rs/cors" "github.com/soheilhy/cmux" + "go.signoz.io/signoz/pkg/alertmanager" "go.signoz.io/signoz/pkg/http/middleware" "go.signoz.io/signoz/pkg/query-service/agentConf" "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" @@ -196,6 +197,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { UseLogsNewSchema: serverOptions.UseLogsNewSchema, UseTraceNewSchema: serverOptions.UseTraceNewSchema, JWT: serverOptions.Jwt, + AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager), }) if err != nil { return nil, err @@ -278,7 +280,6 @@ func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) { } func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, error) { - r := NewRouter() r.Use(middleware.NewAuth(zap.L(), s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}).Wrap) diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 55c00a54c5..e9d3fc1d9f 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -4,8 +4,6 @@ import ( "context" "flag" "os" - "os/signal" - "syscall" "time" prommodel "github.com/prometheus/common/model" @@ -142,22 +140,20 @@ func main() { logger.Fatal("Failed to initialize auth cache", zap.Error(err)) } - signalsChannel := make(chan os.Signal, 1) - signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) - - for { - select { - case status := <-server.HealthCheckStatus(): - logger.Info("Received HealthCheck status: ", zap.Int("status", int(status))) - case <-signalsChannel: - logger.Info("Received OS Interrupt Signal ... ") - err := server.Stop() - if err != nil { - logger.Fatal("Failed to stop server", zap.Error(err)) - } - logger.Info("Server stopped") - return - } + signoz.Start(context.Background()) + + if err := signoz.Wait(context.Background()); err != nil { + zap.L().Fatal("Failed to start signoz", zap.Error(err)) + } + + err = server.Stop() + if err != nil { + zap.L().Fatal("Failed to stop server", zap.Error(err)) + } + + err = signoz.Stop(context.Background()) + if err != nil { + zap.L().Fatal("Failed to stop signoz", zap.Error(err)) } } diff --git a/pkg/signoz/config.go b/pkg/signoz/config.go index b8d188ceee..a1a5cdae5b 100644 --- a/pkg/signoz/config.go +++ b/pkg/signoz/config.go @@ -7,6 +7,7 @@ import ( "reflect" "time" + "go.signoz.io/signoz/pkg/alertmanager" "go.signoz.io/signoz/pkg/apiserver" "go.signoz.io/signoz/pkg/cache" "go.signoz.io/signoz/pkg/config" @@ -44,6 +45,9 @@ type Config struct { // TelemetryStore config TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"` + + // Alertmanager config + Alertmanager alertmanager.Config `mapstructure:"alertmanager"` } // DeprecatedFlags are the flags that are deprecated and scheduled for removal. @@ -63,6 +67,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprec sqlmigrator.NewConfigFactory(), apiserver.NewConfigFactory(), telemetrystore.NewConfigFactory(), + alertmanager.NewConfigFactory(), } conf, err := config.New(ctx, resolverConfig, configFactories) @@ -138,17 +143,26 @@ func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags Depreca } if deprecatedFlags.MaxIdleConns != 50 { - fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS env variable instead.") + fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS instead.") config.TelemetryStore.Connection.MaxIdleConns = deprecatedFlags.MaxIdleConns } if deprecatedFlags.MaxOpenConns != 100 { - fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS env variable instead.") + fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS instead.") config.TelemetryStore.Connection.MaxOpenConns = deprecatedFlags.MaxOpenConns } if deprecatedFlags.DialTimeout != 5*time.Second { - fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT environment variable instead.") + fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT instead.") config.TelemetryStore.Connection.DialTimeout = deprecatedFlags.DialTimeout } + + if os.Getenv("ALERTMANAGER_API_PREFIX") != "" { + fmt.Println("[Deprecated] env ALERTMANAGER_API_PREFIX is deprecated and scheduled for removal. Please use SIGNOZ_ALERTMANAGER_LEGACY_API__URL instead.") + config.Alertmanager.Legacy.ApiURL = os.Getenv("ALERTMANAGER_API_PREFIX") + } + + if os.Getenv("ALERTMANAGER_API_CHANNEL_PATH") != "" { + fmt.Println("[Deprecated] env ALERTMANAGER_API_CHANNEL_PATH is deprecated and scheduled for complete removal.") + } } diff --git a/pkg/signoz/provider.go b/pkg/signoz/provider.go index e98a99d394..5666fa8006 100644 --- a/pkg/signoz/provider.go +++ b/pkg/signoz/provider.go @@ -1,6 +1,9 @@ package signoz import ( + "go.signoz.io/signoz/pkg/alertmanager" + "go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager" + "go.signoz.io/signoz/pkg/alertmanager/signozalertmanager" "go.signoz.io/signoz/pkg/cache" "go.signoz.io/signoz/pkg/cache/memorycache" "go.signoz.io/signoz/pkg/cache/rediscache" @@ -33,6 +36,9 @@ type ProviderConfig struct { // Map of all telemetrystore provider factories TelemetryStoreProviderFactories factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] + + // Map of all alertmanager provider factories + AlertmanagerProviderFactories factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] } func NewProviderConfig() ProviderConfig { @@ -69,3 +75,10 @@ func NewProviderConfig() ProviderConfig { ), } } + +func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedMap[factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config]] { + return factory.MustNewNamedMap( + legacyalertmanager.NewFactory(sqlstore), + signozalertmanager.NewFactory(sqlstore), + ) +} diff --git a/pkg/signoz/provider_test.go b/pkg/signoz/provider_test.go index 82ee0d6559..5dadf9b5bf 100644 --- a/pkg/signoz/provider_test.go +++ b/pkg/signoz/provider_test.go @@ -14,3 +14,9 @@ func TestNewProviderConfig(t *testing.T) { NewProviderConfig() }) } + +func TestNewAlertmanagerProviderFactories(t *testing.T) { + assert.NotPanics(t, func() { + NewAlertmanagerProviderFactories(nil) + }) +} diff --git a/pkg/signoz/signoz.go b/pkg/signoz/signoz.go index c4fe9957e0..d7dc439db0 100644 --- a/pkg/signoz/signoz.go +++ b/pkg/signoz/signoz.go @@ -3,6 +3,7 @@ package signoz import ( "context" + "go.signoz.io/signoz/pkg/alertmanager" "go.signoz.io/signoz/pkg/cache" "go.signoz.io/signoz/pkg/factory" "go.signoz.io/signoz/pkg/instrumentation" @@ -16,10 +17,12 @@ import ( ) type SigNoz struct { + *factory.Registry Cache cache.Cache Web web.Web SQLStore sqlstore.SQLStore TelemetryStore telemetrystore.TelemetryStore + Alertmanager alertmanager.Alertmanager } func New( @@ -102,10 +105,32 @@ func New( return nil, err } + alertmanager, err := factory.NewProviderFromNamedMap( + ctx, + providerSettings, + config.Alertmanager, + NewAlertmanagerProviderFactories(sqlstore), + config.Alertmanager.Provider, + ) + if err != nil { + return nil, err + } + + registry, err := factory.NewRegistry( + instrumentation.Logger(), + factory.NewNamedService(factory.MustNewName("instrumentation"), instrumentation), + factory.NewNamedService(factory.MustNewName("alertmanager"), alertmanager), + ) + if err != nil { + return nil, err + } + return &SigNoz{ + Registry: registry, Cache: cache, Web: web, SQLStore: sqlstore, TelemetryStore: telemetrystore, + Alertmanager: alertmanager, }, nil } diff --git a/pkg/types/alertmanagertypes/config.go b/pkg/types/alertmanagertypes/config.go index be29a560ec..a4a4e314cb 100644 --- a/pkg/types/alertmanagertypes/config.go +++ b/pkg/types/alertmanagertypes/config.go @@ -282,6 +282,9 @@ type ConfigStore interface { // ListChannels returns the list of channels. ListChannels(context.Context, string) ([]*Channel, error) + + // ListAllChannels returns the list of channels for all organizations. + ListAllChannels(context.Context) ([]*Channel, error) } // MarshalSecretValue if set to true will expose Secret type