Skip to content

Commit

Permalink
feat(legacyalertmanager): integrate
Browse files Browse the repository at this point in the history
  • Loading branch information
grandwizard28 committed Feb 20, 2025
1 parent 5ab0d77 commit 309ac97
Show file tree
Hide file tree
Showing 25 changed files with 204 additions and 210 deletions.
5 changes: 4 additions & 1 deletion ee/query-service/app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.signoz.io/signoz/ee/query-service/interfaces"
"go.signoz.io/signoz/ee/query-service/license"
"go.signoz.io/signoz/ee/query-service/usage"
"go.signoz.io/signoz/pkg/alertmanager"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
Expand All @@ -20,6 +21,7 @@ import (
basemodel "go.signoz.io/signoz/pkg/query-service/model"
rules "go.signoz.io/signoz/pkg/query-service/rules"
"go.signoz.io/signoz/pkg/query-service/version"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/types/authtypes"
)

Expand Down Expand Up @@ -51,7 +53,7 @@ type APIHandler struct {
}

// NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler, error) {

baseHandler, err := baseapp.NewAPIHandler(baseapp.APIHandlerOpts{
Reader: opts.DataConnector,
Expand All @@ -67,6 +69,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
})

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
JWT: serverOptions.Jwt,
}

apiHandler, err := api.NewAPIHandler(apiOpts)
apiHandler, err := api.NewAPIHandler(apiOpts, serverOptions.SigNoz)
if err != nil {
return nil, err
}
Expand Down
24 changes: 13 additions & 11 deletions ee/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"strconv"
"syscall"
"time"

"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -198,16 +197,19 @@ func main() {
zap.L().Fatal("Failed to initialize auth cache", zap.Error(err))
}

signalsChannel := make(chan os.Signal, 1)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
signoz.Start(context.Background())

for {
select {
case status := <-server.HealthCheckStatus():
zap.L().Info("Received HealthCheck status: ", zap.Int("status", int(status)))
case <-signalsChannel:
zap.L().Fatal("Received OS Interrupt Signal ... ")
server.Stop()
}
if err := signoz.Wait(context.Background()); err != nil {
zap.L().Fatal("Failed to start signoz", zap.Error(err))
}

err = server.Stop()
if err != nil {
zap.L().Fatal("Failed to stop server", zap.Error(err))
}

err = signoz.Stop(context.Background())
if err != nil {
zap.L().Fatal("Failed to stop signoz", zap.Error(err))
}
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
github.com/soheilhy/cmux v0.1.5
github.com/srikanthccv/ClickHouse-go-mock v0.9.0
github.com/stretchr/testify v1.10.0
github.com/tidwall/gjson v1.18.0
github.com/uptrace/bun v1.2.9
github.com/uptrace/bun/dialect/pgdialect v1.2.9
github.com/uptrace/bun/dialect/sqlitedialect v1.2.9
Expand Down Expand Up @@ -204,6 +205,8 @@ require (
github.com/smarty/assertions v1.15.0 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,13 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
Expand Down
3 changes: 3 additions & 0 deletions pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type Alertmanager interface {
// ListChannels lists all channels for the organization.
ListChannels(context.Context, string) ([]*alertmanagertypes.Channel, error)

// ListAllChannels lists all channels for all organizations. It is used by the legacy alertmanager only.
ListAllChannels(context.Context) ([]*alertmanagertypes.Channel, error)

// GetChannelByID gets a channel for the organization.
GetChannelByID(context.Context, string, int) (*alertmanagertypes.Channel, error)

Expand Down
2 changes: 0 additions & 2 deletions pkg/alertmanager/alertmanagerbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func (batcher *Batcher) Start(ctx context.Context) error {
go func() {
defer batcher.goroutinesWg.Done()

batcher.logger.InfoContext(ctx, "starting alertmanager batcher")
for {
select {
case <-batcher.stopC:
Expand Down Expand Up @@ -106,7 +105,6 @@ func (batcher *Batcher) Add(ctx context.Context, alerts ...*alertmanagertypes.Po

// Stop shuts down the batcher.
func (batcher *Batcher) Stop(ctx context.Context) {
batcher.logger.InfoContext(ctx, "Stopping alertmanager batcher")
close(batcher.stopC)
batcher.goroutinesWg.Wait()
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,19 @@ func (store *config) ListChannels(ctx context.Context, orgID string) ([]*alertma

return channels, nil
}

func (store *config) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
var channels []*alertmanagertypes.Channel

err := store.
sqlstore.
BunDB().
NewSelect().
Model(&channels).
Scan(ctx)
if err != nil {
return nil, err
}

return channels, nil
}
27 changes: 20 additions & 7 deletions pkg/alertmanager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewAPI(alertmanager Alertmanager) *API {
}
}

func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) {
func (api *API) GetAlerts(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()

Expand All @@ -49,7 +49,7 @@ func (api *API) GetAlerts(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusOK, alerts)
}

func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) {
func (api *API) TestReceiver(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()

Expand Down Expand Up @@ -81,7 +81,7 @@ func (api *API) TestReceiver(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusNoContent, nil)
}

func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) {
func (api *API) ListChannels(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()

Expand All @@ -100,7 +100,20 @@ func (api *API) ListChannels(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusOK, channels)
}

func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
func (api *API) ListAllChannels(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()

channels, err := api.alertmanager.ListAllChannels(ctx)
if err != nil {
render.Error(rw, err)
return
}

render.Success(rw, http.StatusOK, channels)
}

func (api *API) GetChannelByID(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()

Expand Down Expand Up @@ -137,7 +150,7 @@ func (api *API) GetChannelByID(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusOK, channel)
}

func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
func (api *API) UpdateChannelByID(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()

Expand Down Expand Up @@ -187,7 +200,7 @@ func (api *API) UpdateChannelByID(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusNoContent, nil)
}

func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) {
func (api *API) DeleteChannelByID(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()

Expand Down Expand Up @@ -224,7 +237,7 @@ func (api *API) DeleteChannelByID(req *http.Request, rw http.ResponseWriter) {
render.Success(rw, http.StatusNoContent, nil)
}

func (api *API) CreateChannel(req *http.Request, rw http.ResponseWriter) {
func (api *API) CreateChannel(rw http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()

Expand Down
14 changes: 7 additions & 7 deletions pkg/alertmanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type Signoz struct {
}

type Legacy struct {
// URL is the URL of the legacy alertmanager.
URL string `mapstructure:"url"`
// ApiURL is the URL of the legacy signoz alertmanager.
ApiURL string `mapstructure:"api_url"`
}

func NewConfigFactory() factory.ConfigFactory {
Expand All @@ -42,7 +42,7 @@ func newConfig() factory.Config {
return Config{
Provider: "legacy",
Legacy: Legacy{
URL: "http://alertmanager:9093/api",
ApiURL: "http://alertmanager:9093/api",
},
Signoz: Signoz{
PollInterval: 15 * time.Second,
Expand All @@ -53,13 +53,13 @@ func newConfig() factory.Config {

func (c Config) Validate() error {
if c.Provider == "legacy" {
if c.Legacy.URL == "" {
return errors.New("url is required")
if c.Legacy.ApiURL == "" {
return errors.New("api_url is required")
}

_, err := url.Parse(c.Legacy.URL)
_, err := url.Parse(c.Legacy.ApiURL)
if err != nil {
return fmt.Errorf("url %q is invalid: %w", c.Legacy.URL, err)
return fmt.Errorf("api_url %q is invalid: %w", c.Legacy.ApiURL, err)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/alertmanager/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func TestNewWithEnvProvider(t *testing.T) {
t.Setenv("SIGNOZ_ALERTMANAGER_PROVIDER", "legacy")
t.Setenv("SIGNOZ_ALERTMANAGER_LEGACY_URL", "http://localhost:9093/api")
t.Setenv("SIGNOZ_ALERTMANAGER_LEGACY_API__URL", "http://localhost:9093/api")

conf, err := config.New(
context.Background(),
Expand All @@ -38,7 +38,7 @@ func TestNewWithEnvProvider(t *testing.T) {
expected := &Config{
Provider: "legacy",
Legacy: Legacy{
URL: "http://localhost:9093/api",
ApiURL: "http://localhost:9093/api",
},
Signoz: def.Signoz,
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/alertmanager/legacyalertmanager/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"time"

"github.com/tidwall/gjson"
"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerbatcher"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
Expand Down Expand Up @@ -43,7 +44,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/alertmanager/legacyalertmanager")
configStore := sqlalertmanagerstore.NewConfigStore(sqlstore)

url, err := url.Parse(config.Legacy.URL)
url, err := url.Parse(config.Legacy.ApiURL)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -96,8 +97,12 @@ func (provider *provider) GetAlerts(ctx context.Context, orgID string, params al
return nil, err
}

if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("bad response status %v", resp.Status)
}

var alerts alertmanagertypes.GettableAlerts
if err := json.Unmarshal(body, &alerts); err != nil {
if err := json.Unmarshal([]byte(gjson.GetBytes(body, "data").Raw), &alerts); err != nil {
return nil, err
}

Expand Down Expand Up @@ -171,6 +176,10 @@ func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*al
return provider.configStore.ListChannels(ctx, orgID)
}

func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
return provider.configStore.ListAllChannels(ctx)
}

func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) {
return provider.configStore.GetChannelByID(ctx, orgID, channelID)
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/alertmanager/signozalertmanager/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"go.signoz.io/signoz/pkg/alertmanager"
"go.signoz.io/signoz/pkg/alertmanager/alertmanagerstore/sqlalertmanagerstore"
"go.signoz.io/signoz/pkg/errors"
"go.signoz.io/signoz/pkg/factory"
"go.signoz.io/signoz/pkg/sqlstore"
"go.signoz.io/signoz/pkg/types/alertmanagertypes"
Expand All @@ -17,6 +18,7 @@ type provider struct {
settings factory.ScopedProviderSettings
configStore alertmanagertypes.ConfigStore
stateStore alertmanagertypes.StateStore
stopC chan struct{}
}

func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[alertmanager.Alertmanager, alertmanager.Config] {
Expand All @@ -42,6 +44,7 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
config: config,
configStore: configStore,
stateStore: stateStore,
stopC: make(chan struct{}),
}, nil
}

Expand All @@ -50,7 +53,7 @@ func (provider *provider) Start(ctx context.Context) error {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
case <-provider.stopC:
return nil
case <-ticker.C:
if err := provider.service.SyncServers(ctx); err != nil {
Expand All @@ -61,6 +64,7 @@ func (provider *provider) Start(ctx context.Context) error {
}

func (provider *provider) Stop(ctx context.Context) error {
close(provider.stopC)
return provider.service.Stop(ctx)
}

Expand Down Expand Up @@ -91,6 +95,10 @@ func (provider *provider) ListChannels(ctx context.Context, orgID string) ([]*al
return channelList, nil
}

func (provider *provider) ListAllChannels(ctx context.Context) ([]*alertmanagertypes.Channel, error) {
return nil, errors.Newf(errors.TypeUnsupported, errors.CodeUnsupported, "not supported by provider signoz")
}

func (provider *provider) GetChannelByID(ctx context.Context, orgID string, channelID int) (*alertmanagertypes.Channel, error) {
config, err := provider.configStore.Get(ctx, orgID)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/factory/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewRegistry(logger *slog.Logger, services ...NamedService) (*Registry, erro
}, nil
}

func (r *Registry) Start(ctx context.Context) error {
func (r *Registry) Start(ctx context.Context) {
for _, s := range r.services.GetInOrder() {
go func(s NamedService) {
r.logger.InfoContext(ctx, "starting service", "service", s.Name())
Expand All @@ -49,7 +49,6 @@ func (r *Registry) Start(ctx context.Context) error {
}(s)
}

return nil
}

func (r *Registry) Wait(ctx context.Context) error {
Expand Down
Loading

0 comments on commit 309ac97

Please # to comment.