Skip to content

Commit

Permalink
Merge branch 'main' into celery-misc-4
Browse files Browse the repository at this point in the history
  • Loading branch information
SagarRajput-7 authored Feb 4, 2025
2 parents 33ad3be + 3b550c4 commit 51428a1
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import './InfraMonitoring.styles.scss';

import { initialQueriesMap } from 'constants/queryBuilder';
import { K8sCategory } from 'container/InfraMonitoringK8s/constants';
import QueryBuilderSearch from 'container/QueryBuilder/filters/QueryBuilderSearch';
import DateTimeSelectionV2 from 'container/TopNav/DateTimeSelectionV2';
import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder';
import { useCallback, useMemo } from 'react';
import { IBuilderQuery } from 'types/api/queryBuilder/queryBuilderData';
import { DataSource } from 'types/common/queryBuilder';

function HostsListControls({
handleFiltersChange,
}: {
handleFiltersChange: (value: IBuilderQuery['filters']) => void;
}): JSX.Element {
const { currentQuery } = useQueryBuilder();
const currentQuery = initialQueriesMap[DataSource.METRICS];
const updatedCurrentQuery = useMemo(
() => ({
...currentQuery,
Expand Down
5 changes: 3 additions & 2 deletions frontend/src/container/InfraMonitoringK8s/K8sHeader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import './InfraMonitoringK8s.styles.scss';

import { Button, Select } from 'antd';
import { initialQueriesMap } from 'constants/queryBuilder';
import QueryBuilderSearch from 'container/QueryBuilder/filters/QueryBuilderSearch';
import DateTimeSelectionV2 from 'container/TopNav/DateTimeSelectionV2';
import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder';
import { Filter, SlidersHorizontal } from 'lucide-react';
import { useCallback, useMemo, useState } from 'react';
import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse';
import { IBuilderQuery } from 'types/api/queryBuilder/queryBuilderData';
import { DataSource } from 'types/common/queryBuilder';

import { K8sCategory } from './constants';
import K8sFiltersSidePanel from './K8sFiltersSidePanel/K8sFiltersSidePanel';
Expand Down Expand Up @@ -47,7 +48,7 @@ function K8sHeader({
}: K8sHeaderProps): JSX.Element {
const [isFiltersSidePanelOpen, setIsFiltersSidePanelOpen] = useState(false);

const { currentQuery } = useQueryBuilder();
const currentQuery = initialQueriesMap[DataSource.METRICS];

const updatedCurrentQuery = useMemo(
() => ({
Expand Down
17 changes: 10 additions & 7 deletions frontend/src/hooks/queryBuilder/useFetchKeysAndValues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,15 @@ export const useFetchKeysAndValues = (
fetchingSuggestionsStatus === 'success' &&
suggestionsData?.payload?.attributes
) {
setKeys(suggestionsData.payload.attributes);
setSourceKeys((prevState) =>
uniqWith(
[...(suggestionsData.payload.attributes ?? []), ...prevState],
isEqual,
),
);
if (!isInfraMonitoring) {
setKeys(suggestionsData.payload.attributes);
setSourceKeys((prevState) =>
uniqWith(
[...(suggestionsData.payload.attributes ?? []), ...prevState],
isEqual,
),
);
}
} else {
setKeys([]);
}
Expand All @@ -265,6 +267,7 @@ export const useFetchKeysAndValues = (
suggestionsData?.payload?.attributes,
fetchingSuggestionsStatus,
suggestionsData?.payload?.example_queries,
isInfraMonitoring,
]);

return {
Expand Down
67 changes: 41 additions & 26 deletions pkg/query-service/app/cloudintegrations/availableServices.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,25 @@ func readAllServiceDefinitions() error {
continue
}

cloudProviderDirPath := path.Join(rootDirName, d.Name())
cloudServices, err := readServiceDefinitionsFromDir(cloudProviderDirPath)
cloudProvider := d.Name()

cloudProviderDirPath := path.Join(rootDirName, cloudProvider)
cloudServices, err := readServiceDefinitionsFromDir(cloudProvider, cloudProviderDirPath)
if err != nil {
return fmt.Errorf("couldn't read %s service definitions", d.Name())
return fmt.Errorf("couldn't read %s service definitions: %w", cloudProvider, err)
}

if len(cloudServices) < 1 {
return fmt.Errorf("no %s services could be read", d.Name())
return fmt.Errorf("no %s services could be read", cloudProvider)
}

availableServices[d.Name()] = cloudServices
availableServices[cloudProvider] = cloudServices
}

return nil
}

func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) (
map[string]CloudServiceDetails, error,
) {
svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath)
Expand All @@ -118,7 +120,7 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
}

svcDirPath := path.Join(cloudProviderDirPath, d.Name())
s, err := readServiceDefinition(svcDirPath)
s, err := readServiceDefinition(cloudProvider, svcDirPath)
if err != nil {
return nil, fmt.Errorf("couldn't read svc definition for %s: %w", d.Name(), err)
}
Expand All @@ -135,14 +137,14 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
return svcDefs, nil
}

func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
integrationJsonPath := path.Join(dirpath, "integration.json")
func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServiceDetails, error) {
integrationJsonPath := path.Join(svcDirpath, "integration.json")

serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath)
if err != nil {
return nil, fmt.Errorf(
"couldn't find integration.json in %s: %w",
dirpath, err,
svcDirpath, err,
)
}

Expand All @@ -155,28 +157,17 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
}

hydrated, err := integrations.HydrateFileUris(
integrationSpec, serviceDefinitionFiles, dirpath,
integrationSpec, serviceDefinitionFiles, svcDirpath,
)
if err != nil {
return nil, fmt.Errorf(
"couldn't hydrate files referenced in service definition %s: %w",
integrationJsonPath, err,
)
}
hydratedSpec := hydrated.(map[string]any)

hydratedSpec := hydrated.(map[string]interface{})
hydratedSpecJson, err := koanfJson.Parser().Marshal(hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't serialize hydrated integration spec back to JSON %s: %w",
integrationJsonPath, err,
)
}

var serviceDef CloudServiceDetails
decoder := json.NewDecoder(bytes.NewReader(hydratedSpecJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(&serviceDef)
serviceDef, err := ParseStructWithJsonTagsFromMap[CloudServiceDetails](hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't parse hydrated JSON spec read from %s: %w",
Expand All @@ -189,11 +180,13 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
}

return &serviceDef, nil
serviceDef.TelemetryCollectionStrategy.Provider = cloudProvider

return serviceDef, nil

}

func validateServiceDefinition(s CloudServiceDetails) error {
func validateServiceDefinition(s *CloudServiceDetails) error {
// Validate dashboard data
seenDashboardIds := map[string]interface{}{}
for _, dd := range s.Assets.Dashboards {
Expand All @@ -211,7 +204,29 @@ func validateServiceDefinition(s CloudServiceDetails) error {
seenDashboardIds[dashboardId] = nil
}

if s.TelemetryCollectionStrategy == nil {
return fmt.Errorf("telemetry_collection_strategy is required")
}

// potentially more to follow

return nil
}

func ParseStructWithJsonTagsFromMap[StructType any](data map[string]any) (
*StructType, error,
) {
mapJson, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("couldn't marshal map to json: %w", err)
}

var res StructType
decoder := json.NewDecoder(bytes.NewReader(mapJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(&res)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal json back to struct: %w", err)
}
return &res, nil
}
82 changes: 80 additions & 2 deletions pkg/query-service/app/cloudintegrations/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/model"
"golang.org/x/exp/maps"
)

var SupportedCloudProviders = []string{
Expand Down Expand Up @@ -92,6 +93,11 @@ type GenerateConnectionUrlRequest struct {
type SigNozAgentConfig struct {
// The region in which SigNoz agent should be installed.
Region string `json:"region"`

IngestionUrl string `json:"ingestion_url"`
IngestionKey string `json:"ingestion_key"`
SigNozAPIUrl string `json:"signoz_api_url"`
SigNozAPIKey string `json:"signoz_api_key"`
}

type GenerateConnectionUrlResponse struct {
Expand Down Expand Up @@ -163,7 +169,17 @@ type AgentCheckInRequest struct {
}

type AgentCheckInResponse struct {
Account AccountRecord `json:"account"`
AccountId string `json:"account_id"`
CloudAccountId string `json:"cloud_account_id"`
RemovedAt *time.Time `json:"removed_at"`

IntegrationConfig IntegrationConfigForAgent `json:"integration_config"`
}

type IntegrationConfigForAgent struct {
EnabledRegions []string `json:"enabled_regions"`

TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry,omitempty"`
}

func (c *Controller) CheckInAsAgent(
Expand Down Expand Up @@ -201,8 +217,70 @@ func (c *Controller) CheckInAsAgent(
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}

// prepare and return integration config to be consumed by agent
telemetryCollectionStrategy, err := NewCloudTelemetryCollectionStrategy(cloudProvider)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't init telemetry collection strategy: %w", err,
))
}

agentConfig := IntegrationConfigForAgent{
EnabledRegions: []string{},
TelemetryCollectionStrategy: telemetryCollectionStrategy,
}

if account.Config != nil && account.Config.EnabledRegions != nil {
agentConfig.EnabledRegions = account.Config.EnabledRegions
}

services, apiErr := listCloudProviderServices(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
svcDetailsById := map[string]*CloudServiceDetails{}
for _, svcDetails := range services {
svcDetailsById[svcDetails.Id] = &svcDetails
}

svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
ctx, cloudProvider, *account.CloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "couldn't get service configs for cloud account",
)
}

// accumulate config in a fixed order to ensure same config generated across runs
configuredSvcIds := maps.Keys(svcConfigs)
slices.Sort(configuredSvcIds)

for _, svcId := range configuredSvcIds {
svcDetails := svcDetailsById[svcId]
svcConfig := svcConfigs[svcId]

if svcDetails != nil {
metricsEnabled := svcConfig.Metrics != nil && svcConfig.Metrics.Enabled
logsEnabled := svcConfig.Logs != nil && svcConfig.Logs.Enabled
if logsEnabled || metricsEnabled {
err := agentConfig.TelemetryCollectionStrategy.AddServiceStrategy(
svcDetails.TelemetryCollectionStrategy, logsEnabled, metricsEnabled,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't add service telemetry collection strategy: %w", err,
))
}
}
}
}

return &AgentCheckInResponse{
Account: *account,
AccountId: account.Id,
CloudAccountId: *account.CloudAccountId,
RemovedAt: account.RemovedAt,
IntegrationConfig: agentConfig,
}, nil
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/query-service/app/cloudintegrations/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func TestAgentCheckIns(t *testing.T) {
},
)
require.Nil(apiErr)
require.Equal(testAccountId1, resp1.Account.Id)
require.Equal(testCloudAccountId1, *resp1.Account.CloudAccountId)
require.Equal(testAccountId1, resp1.AccountId)
require.Equal(testCloudAccountId1, resp1.CloudAccountId)

// The agent should not be able to check in with a different
// cloud account id for the same account.
Expand Down Expand Up @@ -262,9 +262,10 @@ func makeTestConnectedAccount(t *testing.T, controller *Controller, cloudAccount
},
)
require.Nil(apiErr)
require.Equal(testAccountId, resp.Account.Id)
require.Equal(cloudAccountId, *resp.Account.CloudAccountId)

return &resp.Account
require.Equal(testAccountId, resp.AccountId)
require.Equal(cloudAccountId, resp.CloudAccountId)

acc, err := controller.accountsRepo.get(context.TODO(), "aws", resp.AccountId)
require.Nil(err)
return acc
}
Loading

0 comments on commit 51428a1

Please # to comment.