From 918c8942c469fb711a92b3b7545477095db413b8 Mon Sep 17 00:00:00 2001 From: Vibhu Pandey Date: Tue, 18 Feb 2025 13:06:31 +0530 Subject: [PATCH] feat(alertmanager): add service for alertmanager (#7136) ### Summary - adds an alertmanager service --- pkg/alertmanager/alertmanager.go | 25 ++ .../sqlalertmanagerstore/config.go | 100 +++++++ .../sqlalertmanagerstore/state.go | 69 +++++ pkg/alertmanager/api.go | 254 ++++++++++++++++++ pkg/alertmanager/config.go | 42 +++ .../internalalertmanager/provider.go | 70 +++++ pkg/alertmanager/server/server.go | 55 +++- pkg/alertmanager/service.go | 137 ++++++++++ pkg/errors/code.go | 10 + .../alertmanagertypestest/state.go | 38 +-- pkg/types/alertmanagertypes/channel.go | 15 ++ pkg/types/alertmanagertypes/config.go | 59 ++-- pkg/types/alertmanagertypes/state.go | 67 ++++- 13 files changed, 884 insertions(+), 57 deletions(-) create mode 100644 pkg/alertmanager/alertmanager.go create mode 100644 pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go create mode 100644 pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/state.go create mode 100644 pkg/alertmanager/api.go create mode 100644 pkg/alertmanager/config.go create mode 100644 pkg/alertmanager/internalalertmanager/provider.go create mode 100644 pkg/alertmanager/service.go diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go new file mode 100644 index 0000000000..b83f392caa --- /dev/null +++ b/pkg/alertmanager/alertmanager.go @@ -0,0 +1,25 @@ +package alertmanager + +import ( + "context" + + "go.signoz.io/signoz/pkg/errors" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/types/alertmanagertypes" +) + +var ( + ErrCodeAlertmanagerNotFound = errors.MustNewCode("alertmanager_not_found") +) + +type Alertmanager interface { + factory.Service + // GetAlerts gets the alerts from the alertmanager per organization. + GetAlerts(context.Context, string, alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) + + // PutAlerts puts the alerts into the alertmanager per organization. + PutAlerts(context.Context, string, alertmanagertypes.PostableAlerts) error + + // TestReceiver sends a test alert to a receiver. + TestReceiver(context.Context, string, alertmanagertypes.Receiver) error +} diff --git a/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go b/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go new file mode 100644 index 0000000000..a50d3d8b51 --- /dev/null +++ b/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go @@ -0,0 +1,100 @@ +package sqlalertmanagerstore + +import ( + "context" + "database/sql" + + "go.signoz.io/signoz/pkg/errors" + "go.signoz.io/signoz/pkg/sqlstore" + "go.signoz.io/signoz/pkg/types/alertmanagertypes" +) + +type config struct { + sqlstore sqlstore.SQLStore +} + +func NewConfigStore(sqlstore sqlstore.SQLStore) alertmanagertypes.ConfigStore { + return &config{sqlstore: sqlstore} +} + +// Get implements alertmanagertypes.ConfigStore. +func (store *config) Get(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) { + storeableConfig := new(alertmanagertypes.StoreableConfig) + + err := store. + sqlstore. + BunDB(). + NewSelect(). + Model(storeableConfig). + Where("org_id = ?", orgID). + Scan(ctx) + if err != nil { + if err == sql.ErrNoRows { + return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerConfigNotFound, "cannot find alertmanager config for orgID %s", orgID) + } + + return nil, err + } + + cfg, err := alertmanagertypes.NewConfigFromStoreableConfig(storeableConfig) + if err != nil { + return nil, err + } + + return cfg, nil +} + +// Set implements alertmanagertypes.ConfigStore. +func (store *config) Set(ctx context.Context, config *alertmanagertypes.Config) error { + tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil) + if err != nil { + return err + } + + defer tx.Rollback() //nolint:errcheck + + if _, err = tx. + NewInsert(). + Model(config.StoreableConfig()). + On("CONFLICT (org_id) DO UPDATE"). + Set("config = ?", string(config.StoreableConfig().Config)). + Set("updated_at = ?", config.StoreableConfig().UpdatedAt). + Exec(ctx); err != nil { + return err + } + + channels := config.Channels() + if len(channels) != 0 { + if _, err = tx.NewInsert(). + Model(&channels). + On("CONFLICT (name) DO UPDATE"). + Set("data = EXCLUDED.data"). + Set("updated_at = EXCLUDED.updated_at"). + Exec(ctx); err != nil { + return err + } + } + + if err = tx.Commit(); err != nil { + return err + } + + return nil +} + +func (store *config) ListOrgs(ctx context.Context) ([]string, error) { + var orgIDs []string + + err := store. + sqlstore. + BunDB(). + NewSelect(). + Table("organizations"). + ColumnExpr("id"). + Scan(ctx, &orgIDs) + if err != nil { + return nil, err + } + + return orgIDs, nil +} diff --git a/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/state.go b/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/state.go new file mode 100644 index 0000000000..008db2cda3 --- /dev/null +++ b/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/state.go @@ -0,0 +1,69 @@ +package sqlalertmanagerstore + +import ( + "context" + "database/sql" + + "go.signoz.io/signoz/pkg/errors" + "go.signoz.io/signoz/pkg/sqlstore" + "go.signoz.io/signoz/pkg/types/alertmanagertypes" +) + +type state struct { + sqlstore sqlstore.SQLStore +} + +func NewStateStore(sqlstore sqlstore.SQLStore) alertmanagertypes.StateStore { + return &state{sqlstore: sqlstore} +} + +// Get implements alertmanagertypes.StateStore. +func (store *state) Get(ctx context.Context, orgID string) (*alertmanagertypes.StoreableState, error) { + storeableState := new(alertmanagertypes.StoreableState) + + err := store. + sqlstore. + BunDB(). + NewSelect(). + Model(storeableState). + Where("org_id = ?", orgID). + Scan(ctx) + if err != nil { + if err == sql.ErrNoRows { + return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerStateNotFound, "cannot find alertmanager state for org %s", orgID) + } + + return nil, err + } + + return storeableState, nil +} + +// Set implements alertmanagertypes.StateStore. +func (store *state) Set(ctx context.Context, orgID string, storeableState *alertmanagertypes.StoreableState) error { + tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil) + if err != nil { + return err + } + + defer tx.Rollback() //nolint:errcheck + + _, err = tx. + NewInsert(). + Model(storeableState). + On("CONFLICT (org_id) DO UPDATE"). + Set("silences = EXCLUDED.silences"). + Set("nflog = EXCLUDED.nflog"). + Set("updated_at = EXCLUDED.updated_at"). + Where("org_id = ?", orgID). + Exec(ctx) + if err != nil { + return err + } + + if err := tx.Commit(); err != nil { + return err + } + + return nil +} diff --git a/pkg/alertmanager/api.go b/pkg/alertmanager/api.go new file mode 100644 index 0000000000..beb51d4fad --- /dev/null +++ b/pkg/alertmanager/api.go @@ -0,0 +1,254 @@ +package alertmanager + +import ( + "context" + "io" + "net/http" + "strconv" + "time" + + "github.com/gorilla/mux" + "go.signoz.io/signoz/pkg/errors" + "go.signoz.io/signoz/pkg/http/render" + "go.signoz.io/signoz/pkg/types/alertmanagertypes" + "go.signoz.io/signoz/pkg/types/authtypes" +) + +type API struct { + configStore alertmanagertypes.ConfigStore + alertmanager Alertmanager +} + +func NewAPI(configStore alertmanagertypes.ConfigStore, alertmanager Alertmanager) *API { + return &API{ + configStore: configStore, + alertmanager: alertmanager, + } +} + +func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) { + ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) + defer cancel() + + claims, ok := authtypes.ClaimsFromContext(ctx) + if !ok { + render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated")) + return + } + + params, err := alertmanagertypes.NewGettableAlertsParams(req) + if err != nil { + render.Error(rw, err) + return + } + + alerts, err := api.alertmanager.GetAlerts(ctx, claims.OrgID, params) + if err != nil { + render.Error(rw, err) + return + } + + render.Success(rw, http.StatusOK, alerts) +} + +func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) { + ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) + defer cancel() + + claims, ok := authtypes.ClaimsFromContext(ctx) + if !ok { + render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated")) + return + } + + body, err := io.ReadAll(req.Body) + if err != nil { + render.Error(rw, err) + return + } + defer req.Body.Close() //nolint:errcheck + + receiver, err := alertmanagertypes.NewReceiver(string(body)) + if err != nil { + render.Error(rw, err) + return + } + + err = api.alertmanager.TestReceiver(ctx, claims.OrgID, receiver) + if err != nil { + render.Error(rw, err) + return + } + + render.Success(rw, http.StatusNoContent, nil) +} + +func (api *API) GetChannels(req *http.Request, rw http.ResponseWriter) { + ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) + defer cancel() + + claims, ok := authtypes.ClaimsFromContext(ctx) + if !ok { + render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated")) + return + } + + config, err := api.configStore.Get(ctx, claims.OrgID) + if err != nil { + render.Error(rw, err) + return + } + + channels := config.Channels() + + channelList := make([]*alertmanagertypes.Channel, 0, len(channels)) + for _, channel := range channels { + channelList = append(channelList, channel) + } + + render.Success(rw, http.StatusOK, channelList) +} + +func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) { + ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) + defer cancel() + + claims, ok := authtypes.ClaimsFromContext(ctx) + if !ok { + render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated")) + return + } + + vars := mux.Vars(req) + if vars == nil { + render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path")) + return + } + + idString, ok := vars["id"] + if !ok { + render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path")) + return + } + + id, err := strconv.Atoi(idString) + if err != nil { + render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer")) + return + } + + config, err := api.configStore.Get(ctx, claims.OrgID) + if err != nil { + render.Error(rw, err) + return + } + + channels := config.Channels() + channel, err := alertmanagertypes.GetChannelByID(channels, id) + if err != nil { + render.Error(rw, err) + return + } + + render.Success(rw, http.StatusOK, channel) +} + +func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) { + ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) + defer cancel() + + claims, ok := authtypes.ClaimsFromContext(ctx) + if !ok { + render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated")) + return + } + + body, err := io.ReadAll(req.Body) + if err != nil { + render.Error(rw, err) + return + } + defer req.Body.Close() //nolint:errcheck + + receiver, err := alertmanagertypes.NewReceiver(string(body)) + if err != nil { + render.Error(rw, err) + return + } + + config, err := api.configStore.Get(ctx, claims.OrgID) + if err != nil { + render.Error(rw, err) + return + } + + err = config.UpdateReceiver(alertmanagertypes.NewRouteFromReceiver(receiver), receiver) + if err != nil { + render.Error(rw, err) + return + } + + err = api.configStore.Set(ctx, config) + if err != nil { + render.Error(rw, err) + return + } + + render.Success(rw, http.StatusNoContent, nil) +} + +func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) { + ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second) + defer cancel() + + claims, ok := authtypes.ClaimsFromContext(ctx) + if !ok { + render.Error(rw, errors.Newf(errors.TypeUnauthenticated, errors.CodeUnauthenticated, "unauthenticated")) + return + } + + vars := mux.Vars(req) + if vars == nil { + render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path")) + return + } + + idString, ok := vars["id"] + if !ok { + render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is required in path")) + return + } + + id, err := strconv.Atoi(idString) + if err != nil { + render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "id is not a valid integer")) + return + } + + config, err := api.configStore.Get(ctx, claims.OrgID) + if err != nil { + render.Error(rw, err) + return + } + + channels := config.Channels() + channel, err := alertmanagertypes.GetChannelByID(channels, id) + if err != nil { + render.Error(rw, err) + return + } + + err = config.DeleteReceiver(channel.Name) + if err != nil { + render.Error(rw, err) + return + } + + err = api.configStore.Set(ctx, config) + if err != nil { + render.Error(rw, err) + return + } + + render.Success(rw, http.StatusNoContent, nil) +} diff --git a/pkg/alertmanager/config.go b/pkg/alertmanager/config.go new file mode 100644 index 0000000000..e5fd4fe148 --- /dev/null +++ b/pkg/alertmanager/config.go @@ -0,0 +1,42 @@ +package alertmanager + +import ( + "time" + + "go.signoz.io/signoz/pkg/alertmanager/server" + "go.signoz.io/signoz/pkg/factory" +) + +type Config struct { + // Config is the config for the alertmanager server. + server.Config `mapstructure:",squash"` + + // Provider is the provider for the alertmanager service. + Provider string `mapstructure:"provider"` + + // Internal is the internal alertmanager configuration. + Internal Internal `mapstructure:"internal"` +} + +type Internal struct { + // PollInterval is the interval at which the alertmanager is synced. + PollInterval time.Duration `mapstructure:"poll_interval"` +} + +func NewConfigFactory() factory.ConfigFactory { + return factory.NewConfigFactory(factory.MustNewName("alertmanager"), newConfig) +} + +func newConfig() factory.Config { + return Config{ + Config: server.NewConfig(), + Provider: "internal", + Internal: Internal{ + PollInterval: 15 * time.Second, + }, + } +} + +func (c Config) Validate() error { + return nil +} diff --git a/pkg/alertmanager/internalalertmanager/provider.go b/pkg/alertmanager/internalalertmanager/provider.go new file mode 100644 index 0000000000..022d458a59 --- /dev/null +++ b/pkg/alertmanager/internalalertmanager/provider.go @@ -0,0 +1,70 @@ +package internalalertmanager + +import ( + "context" + "time" + + "go.signoz.io/signoz/pkg/alertmanager" + "go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/sqlstore" + "go.signoz.io/signoz/pkg/types/alertmanagertypes" +) + +type provider struct { + service *alertmanager.Service + config alertmanager.Config + settings factory.ScopedProviderSettings +} + +func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] { + return factory.NewProviderFactory(factory.MustNewName("internal"), func(ctx context.Context, settings factory.ProviderSettings, config alertmanager.Config) (alertmanager.Alertmanager, error) { + return New(ctx, settings, config, sqlstore) + }) +} + +func New(ctx context.Context, providerSettings factory.ProviderSettings, config alertmanager.Config, sqlstore sqlstore.SQLStore) (alertmanager.Alertmanager, error) { + settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/internalalertmanager") + return &provider{ + service: alertmanager.New( + ctx, + settings, + config, + sqlalertmanagerstore.NewStateStore(sqlstore), + sqlalertmanagerstore.NewConfigStore(sqlstore), + ), + settings: settings, + config: config, + }, nil +} + +func (provider *provider) Start(ctx context.Context) error { + ticker := time.NewTicker(provider.config.Internal.PollInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := provider.service.SyncServers(ctx); err != nil { + provider.settings.Logger().ErrorContext(ctx, "failed to sync alertmanager servers", "error", err) + } + } + } +} + +func (provider *provider) Stop(ctx context.Context) error { + return provider.service.Stop(ctx) +} + +func (provider *provider) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) { + return provider.service.GetAlerts(ctx, orgID, params) +} + +func (provider *provider) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error { + return provider.service.PutAlerts(ctx, orgID, alerts) +} + +func (provider *provider) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { + return provider.service.TestReceiver(ctx, orgID, receiver) +} diff --git a/pkg/alertmanager/server/server.go b/pkg/alertmanager/server/server.go index 1480a3d50f..06252beec8 100644 --- a/pkg/alertmanager/server/server.go +++ b/pkg/alertmanager/server/server.go @@ -34,7 +34,7 @@ type Server struct { logger *slog.Logger // registry is the prometheus registry for the alertmanager - registry *prometheus.Registry + registry prometheus.Registerer // srvConfig is the server config for the alertmanager srvConfig Config @@ -64,7 +64,7 @@ type Server struct { stopc chan struct{} } -func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) { +func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registerer, srvConfig Config, orgID string, stateStore alertmanagertypes.StateStore) (*Server, error) { server := &Server{ logger: logger.With("pkg", "go.signoz.io/pkg/alertmanager/server"), registry: registry, @@ -77,20 +77,18 @@ func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry server.marker = alertmanagertypes.NewMarker(server.registry) // get silences for initial state - silencesstate, err := server.stateStore.Get(ctx, server.orgID, alertmanagertypes.SilenceStateName) + state, err := server.stateStore.Get(ctx, server.orgID) if err != nil && !errors.Ast(err, errors.TypeNotFound) { return nil, err } - // get nflog for initial state - nflogstate, err := server.stateStore.Get(ctx, server.orgID, alertmanagertypes.NFLogStateName) - if err != nil && !errors.Ast(err, errors.TypeNotFound) { - return nil, err + silencesSnapshot := "" + if state != nil { + silencesSnapshot = state.Silences } - // Initialize silences server.silences, err = silence.New(silence.Options{ - SnapshotReader: strings.NewReader(silencesstate), + SnapshotReader: strings.NewReader(silencesSnapshot), Retention: srvConfig.Silences.Retention, Limits: silence.Limits{ MaxSilences: func() int { return srvConfig.Silences.Max }, @@ -103,9 +101,14 @@ func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry return nil, err } + nflogSnapshot := "" + if state != nil { + nflogSnapshot = state.NFLog + } + // Initialize notification log server.nflog, err = nflog.New(nflog.Options{ - SnapshotReader: strings.NewReader(nflogstate), + SnapshotReader: strings.NewReader(nflogSnapshot), Retention: server.srvConfig.NFLog.Retention, Metrics: server.registry, Logger: server.logger, @@ -125,7 +128,21 @@ func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry // Don't return here - we need to snapshot our state first. } - return server.stateStore.Set(ctx, server.orgID, alertmanagertypes.SilenceStateName, server.silences) + state, err := server.stateStore.Get(ctx, server.orgID) + if err != nil && !errors.Ast(err, errors.TypeNotFound) { + return 0, err + } + + if state == nil { + state = alertmanagertypes.NewStoreableState(server.orgID) + } + + c, err := state.Set(alertmanagertypes.SilenceStateName, server.silences) + if err != nil { + return 0, err + } + + return c, server.stateStore.Set(ctx, server.orgID, state) }) }() @@ -140,7 +157,21 @@ func New(ctx context.Context, logger *slog.Logger, registry *prometheus.Registry // Don't return without saving the current state. } - return server.stateStore.Set(ctx, server.orgID, alertmanagertypes.NFLogStateName, server.nflog) + state, err := server.stateStore.Get(ctx, server.orgID) + if err != nil && !errors.Ast(err, errors.TypeNotFound) { + return 0, err + } + + if state == nil { + state = alertmanagertypes.NewStoreableState(server.orgID) + } + + c, err := state.Set(alertmanagertypes.NFLogStateName, server.nflog) + if err != nil { + return 0, err + } + + return c, server.stateStore.Set(ctx, server.orgID, state) }) }() diff --git a/pkg/alertmanager/service.go b/pkg/alertmanager/service.go new file mode 100644 index 0000000000..6ec9c03081 --- /dev/null +++ b/pkg/alertmanager/service.go @@ -0,0 +1,137 @@ +package alertmanager + +import ( + "context" + "sync" + + "go.signoz.io/signoz/pkg/alertmanager/server" + "go.signoz.io/signoz/pkg/errors" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/types/alertmanagertypes" +) + +type Service struct { + // config is the config for the alertmanager service + config Config + + // stateStore is the state store for the alertmanager service + stateStore alertmanagertypes.StateStore + + // configStore is the config store for the alertmanager service + configStore alertmanagertypes.ConfigStore + + // settings is the settings for the alertmanager service + settings factory.ScopedProviderSettings + + // Map of organization id to alertmanager server + servers map[string]*server.Server + + // Mutex to protect the servers map + serversMtx sync.RWMutex +} + +func New(ctx context.Context, settings factory.ScopedProviderSettings, config Config, stateStore alertmanagertypes.StateStore, configStore alertmanagertypes.ConfigStore) *Service { + service := &Service{ + config: config, + stateStore: stateStore, + configStore: configStore, + settings: settings, + servers: make(map[string]*server.Server), + serversMtx: sync.RWMutex{}, + } + + return service +} + +func (service *Service) SyncServers(ctx context.Context) error { + orgIDs, err := service.configStore.ListOrgs(ctx) + if err != nil { + return err + } + + service.serversMtx.Lock() + for _, orgID := range orgIDs { + config, err := service.getConfig(ctx, orgID) + if err != nil { + service.settings.Logger().Error("failed to get alertmanagerconfig for org", "orgID", orgID, "error", err) + continue + } + + service.servers[orgID], err = server.New(ctx, service.settings.Logger(), service.settings.PrometheusRegisterer(), server.Config{}, orgID, service.stateStore) + if err != nil { + service.settings.Logger().Error("failed to create alertmanagerserver", "orgID", orgID, "error", err) + continue + } + + err = service.servers[orgID].SetConfig(ctx, config) + if err != nil { + service.settings.Logger().Error("failed to set config for alertmanager server", "orgID", orgID, "error", err) + continue + } + } + service.serversMtx.Unlock() + + return nil +} + +func (service *Service) GetAlerts(ctx context.Context, orgID string, params alertmanagertypes.GettableAlertsParams) (alertmanagertypes.GettableAlerts, error) { + server, err := service.getServer(orgID) + if err != nil { + return nil, err + } + + return server.GetAlerts(ctx, params) +} + +func (service *Service) PutAlerts(ctx context.Context, orgID string, alerts alertmanagertypes.PostableAlerts) error { + server, err := service.getServer(orgID) + if err != nil { + return err + } + + return server.PutAlerts(ctx, alerts) +} + +func (service *Service) TestReceiver(ctx context.Context, orgID string, receiver alertmanagertypes.Receiver) error { + server, err := service.getServer(orgID) + if err != nil { + return err + } + + return server.TestReceiver(ctx, receiver) +} + +func (service *Service) Stop(ctx context.Context) error { + for _, server := range service.servers { + server.Stop(ctx) + } + + return nil +} + +func (service *Service) getConfig(ctx context.Context, orgID string) (*alertmanagertypes.Config, error) { + config, err := service.configStore.Get(ctx, orgID) + if err != nil { + if !errors.Ast(err, errors.TypeNotFound) { + return nil, err + } + + config, err = alertmanagertypes.NewDefaultConfig(service.config.Global, service.config.Route, orgID) + if err != nil { + return nil, err + } + + return config, err + } + + return config, nil +} + +func (service *Service) getServer(orgID string) (*server.Server, error) { + server, ok := service.servers[orgID] + if !ok { + return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerNotFound, "alertmanager not found for org %s", orgID) + } + + return server, nil +} diff --git a/pkg/errors/code.go b/pkg/errors/code.go index bdd8e16b6a..84df6a2f5b 100644 --- a/pkg/errors/code.go +++ b/pkg/errors/code.go @@ -5,6 +5,16 @@ import ( "regexp" ) +var ( + CodeInvalidInput code = code{"invalid_input"} + CodeInternal = code{"internal"} + CodeUnsupported = code{"unsupported"} + CodeNotFound = code{"not_found"} + CodeMethodNotAllowed = code{"method_not_allowed"} + CodeAlreadyExists = code{"already_exists"} + CodeUnauthenticated = code{"unauthenticated"} +) + var ( codeRegex = regexp.MustCompile(`^[a-z_]+$`) ) diff --git a/pkg/types/alertmanagertypes/alertmanagertypestest/state.go b/pkg/types/alertmanagertypes/alertmanagertypestest/state.go index f52f59e364..4aa4b5bbaf 100644 --- a/pkg/types/alertmanagertypes/alertmanagertypestest/state.go +++ b/pkg/types/alertmanagertypes/alertmanagertypestest/state.go @@ -2,54 +2,36 @@ package alertmanagertypestest import ( "context" - "encoding/base64" "sync" "go.signoz.io/signoz/pkg/errors" "go.signoz.io/signoz/pkg/types/alertmanagertypes" ) +var _ alertmanagertypes.StateStore = (*StateStore)(nil) + type StateStore struct { - states map[string]map[string]string + states map[string]*alertmanagertypes.StoreableState mtx sync.RWMutex } func NewStateStore() *StateStore { return &StateStore{ - states: make(map[string]map[string]string), + states: make(map[string]*alertmanagertypes.StoreableState), } } -func (s *StateStore) Set(ctx context.Context, orgID string, stateName alertmanagertypes.StateName, state alertmanagertypes.State) (int64, error) { - if _, ok := s.states[orgID]; !ok { - s.states[orgID] = make(map[string]string) - } - - bytes, err := state.MarshalBinary() - if err != nil { - return 0, err - } - +func (s *StateStore) Set(ctx context.Context, orgID string, storeableState *alertmanagertypes.StoreableState) error { s.mtx.Lock() - s.states[orgID][stateName.String()] = base64.StdEncoding.EncodeToString(bytes) + s.states[orgID] = storeableState s.mtx.Unlock() - return int64(len(bytes)), nil + return nil } -func (s *StateStore) Get(ctx context.Context, orgID string, stateName alertmanagertypes.StateName) (string, error) { +func (s *StateStore) Get(ctx context.Context, orgID string) (*alertmanagertypes.StoreableState, error) { if _, ok := s.states[orgID]; !ok { - return "", errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerStateNotFound, "state %q for orgID %q not found", stateName.String(), orgID) - } - - state, ok := s.states[orgID][stateName.String()] - if !ok { - return "", errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerStateNotFound, "state %q for orgID %q not found", stateName.String(), orgID) - } - - bytes, err := base64.StdEncoding.DecodeString(state) - if err != nil { - return "", err + return nil, errors.Newf(errors.TypeNotFound, alertmanagertypes.ErrCodeAlertmanagerStateNotFound, "state for orgID %q not found", orgID) } - return string(bytes), nil + return s.states[orgID], nil } diff --git a/pkg/types/alertmanagertypes/channel.go b/pkg/types/alertmanagertypes/channel.go index 69d451aa02..48f86ff6ba 100644 --- a/pkg/types/alertmanagertypes/channel.go +++ b/pkg/types/alertmanagertypes/channel.go @@ -8,6 +8,11 @@ import ( "github.com/prometheus/alertmanager/config" "github.com/uptrace/bun" + "go.signoz.io/signoz/pkg/errors" +) + +var ( + ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found") ) var ( @@ -137,3 +142,13 @@ func NewConfigFromChannels(globalConfig GlobalConfig, routeConfig RouteConfig, c return cfg, nil } + +func GetChannelByID(channels Channels, id int) (*Channel, error) { + for _, channel := range channels { + if channel.ID == id { + return channel, nil + } + } + + return nil, errors.Newf(errors.TypeNotFound, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %d", id) +} diff --git a/pkg/types/alertmanagertypes/config.go b/pkg/types/alertmanagertypes/config.go index fbef086182..58c5750a7a 100644 --- a/pkg/types/alertmanagertypes/config.go +++ b/pkg/types/alertmanagertypes/config.go @@ -1,6 +1,7 @@ package alertmanagertypes import ( + "context" "crypto/md5" "encoding/json" "fmt" @@ -19,6 +20,7 @@ const ( var ( ErrCodeAlertmanagerConfigInvalid = errors.MustNewCode("alertmanager_config_invalid") + ErrCodeAlertmanagerConfigNotFound = errors.MustNewCode("alertmanager_config_not_found") ErrCodeAlertmanagerConfigConflict = errors.MustNewCode("alertmanager_config_conflict") ) @@ -37,13 +39,11 @@ type RouteConfig struct { type StoreableConfig struct { bun.BaseModel `bun:"table:alertmanager_config"` - ID uint64 `bun:"id"` - Config string `bun:"config"` - SilencesState string `bun:"silences_state,nullzero"` - NFLogState string `bun:"nflog_state,nullzero"` - CreatedAt time.Time `bun:"created_at"` - UpdatedAt time.Time `bun:"updated_at"` - OrgID string `bun:"org_id"` + ID uint64 `bun:"id,pk,autoincrement"` + Config string `bun:"config"` + CreatedAt time.Time `bun:"created_at"` + UpdatedAt time.Time `bun:"updated_at"` + OrgID string `bun:"org_id"` } // Config is the type for the entire alertmanager configuration @@ -66,17 +66,35 @@ func NewConfig(c *config.Config, orgID string) *Config { return &Config{ alertmanagerConfig: c, storeableConfig: &StoreableConfig{ - Config: string(newRawFromConfig(c)), - SilencesState: "", - NFLogState: "", - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - OrgID: orgID, + Config: string(newRawFromConfig(c)), + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + OrgID: orgID, }, channels: channels, } } +func NewConfigFromStoreableConfig(sc *StoreableConfig) (*Config, error) { + alertmanagerConfig, err := newConfigFromString(sc.Config) + if err != nil { + return nil, err + } + + channels := NewChannelsFromConfig(alertmanagerConfig, sc.OrgID) + + return &Config{ + alertmanagerConfig: alertmanagerConfig, + storeableConfig: sc, + channels: channels, + orgID: sc.OrgID, + }, nil +} + +func NewRouteFromReceiver(receiver Receiver) *config.Route { + return &config.Route{Receiver: receiver.Name, Continue: true} +} + func NewDefaultConfig(globalConfig GlobalConfig, routeConfig RouteConfig, orgID string) (*Config, error) { err := mergo.Merge(&globalConfig, config.DefaultGlobalConfig()) if err != nil { @@ -96,14 +114,14 @@ func NewDefaultConfig(globalConfig GlobalConfig, routeConfig RouteConfig, orgID }, orgID), nil } -func NewConfigFromString(s string, orgID string) (*Config, error) { +func newConfigFromString(s string) (*config.Config, error) { config := new(config.Config) err := json.Unmarshal([]byte(s), config) if err != nil { return nil, err } - return NewConfig(config, orgID), nil + return config, nil } func newRawFromConfig(c *config.Config) []byte { @@ -240,6 +258,17 @@ func (c *Config) DeleteReceiver(name string) error { return nil } +type ConfigStore interface { + // Set creates or updates a config. + Set(context.Context, *Config) error + + // Get returns the config for the given orgID + Get(context.Context, string) (*Config, error) + + // ListOrgs returns the list of orgs + ListOrgs(context.Context) ([]string, error) +} + // MarshalSecretValue if set to true will expose Secret type // through the marshal interfaces. We need to store the actual value of the secret // in the database, so we need to set this to true. diff --git a/pkg/types/alertmanagertypes/state.go b/pkg/types/alertmanagertypes/state.go index 69f9a9eb94..25ac871e10 100644 --- a/pkg/types/alertmanagertypes/state.go +++ b/pkg/types/alertmanagertypes/state.go @@ -2,8 +2,11 @@ package alertmanagertypes import ( "context" + "encoding/base64" + "time" "github.com/prometheus/alertmanager/cluster" + "github.com/uptrace/bun" "go.signoz.io/signoz/pkg/errors" ) @@ -22,6 +25,66 @@ var ( ErrCodeAlertmanagerStateNotFound = errors.MustNewCode("alertmanager_state_not_found") ) +type StoreableState struct { + bun.BaseModel `bun:"table:alertmanager_state"` + + ID uint64 `bun:"id,pk,autoincrement"` + Silences string `bun:"silences,nullzero"` + NFLog string `bun:"nflog,nullzero"` + CreatedAt time.Time `bun:"created_at"` + UpdatedAt time.Time `bun:"updated_at"` + OrgID string `bun:"org_id"` +} + +func NewStoreableState(orgID string) *StoreableState { + return &StoreableState{ + OrgID: orgID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } +} + +func (s *StoreableState) Set(stateName StateName, state State) (int64, error) { + marshalledState, err := state.MarshalBinary() + if err != nil { + return 0, err + } + encodedState := base64.StdEncoding.EncodeToString(marshalledState) + + switch stateName { + case SilenceStateName: + s.Silences = encodedState + case NFLogStateName: + s.NFLog = encodedState + } + + s.UpdatedAt = time.Now() + + return int64(len(marshalledState)), nil +} + +func (s *StoreableState) Get(stateName StateName) (string, error) { + base64encodedState := "" + + switch stateName { + case SilenceStateName: + base64encodedState = s.Silences + case NFLogStateName: + base64encodedState = s.NFLog + } + + if base64encodedState == "" { + return "", errors.New(errors.TypeNotFound, ErrCodeAlertmanagerStateNotFound, "state not found") + } + + decodedState, err := base64.StdEncoding.DecodeString(base64encodedState) + if err != nil { + return "", err + } + + return string(decodedState), nil +} + type StateName struct { name string } @@ -35,9 +98,9 @@ type StateStore interface { // The return type matches the return of `silence.Maintenance` or `nflog.Maintenance`. // See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/silence/silence.go#L217 // and https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L94 - Set(context.Context, string, StateName, State) (int64, error) + Set(context.Context, string, *StoreableState) error // Gets the silence state or the notification log state as a string from the store. This is used as a snapshot to load the // initial state of silences or notification log when starting the alertmanager. - Get(context.Context, string, StateName) (string, error) + Get(context.Context, string) (*StoreableState, error) }