From 3e67789a52eef88b8e8a85a98bee8d77961d7bbb Mon Sep 17 00:00:00 2001 From: pseudoyu Date: Fri, 1 Nov 2024 12:27:17 +0700 Subject: [PATCH] feat: require monitor to get rsshub and federated workers --- cmd/main.go | 127 +++++++++--------- config/config.go | 2 +- .../node/component/info/handler_worker.go | 61 ++------- 3 files changed, 80 insertions(+), 110 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index b8b1d846..ec6e707d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -91,26 +91,8 @@ var command = cobra.Command{ var settlementCaller *vsl.SettlementCaller - // Apply database migrations for all modules except the broadcaster. - if module != BroadcasterArg && !config.IsRSSComponentOnly(configFile) { - databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database) - if err != nil { - return fmt.Errorf("dial database: %w", err) - } - - tx, err := databaseClient.Begin(cmd.Context()) - if err != nil { - return fmt.Errorf("begin transaction: %w", err) - } - - if err := tx.Migrate(cmd.Context()); err != nil { - err := tx.Rollback() - if err != nil { - return fmt.Errorf("rollback database: %w", err) - } - return fmt.Errorf("migrate database: %w", err) - } - + // Broadcaster does not need Redis, DB and Network Params Client + if module != BroadcasterArg { // Init a Redis client. if configFile.Redis == nil { zap.L().Error("redis configFile is missing") @@ -122,61 +104,82 @@ var command = cobra.Command{ return fmt.Errorf("new redis client: %w", err) } - vslClient, err := parameter.InitVSLClient() - if err != nil { - return fmt.Errorf("init vsl client: %w", err) - } + // DB and Network Params Client is not needed for RSS component only Node + if !config.IsRSSComponentOnly(configFile) { + databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database) + if err != nil { + return fmt.Errorf("dial database: %w", err) + } - chainID, err := vslClient.ChainID(context.Background()) - if err != nil { - return fmt.Errorf("get chain id: %w", err) - } + tx, err := databaseClient.Begin(cmd.Context()) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } - networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient) - if err != nil { - return fmt.Errorf("new network params caller: %w", err) - } + if err := tx.Migrate(cmd.Context()); err != nil { + err := tx.Rollback() + if err != nil { + return fmt.Errorf("rollback database: %w", err) + } + return fmt.Errorf("migrate database: %w", err) + } - settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient) - if err != nil { - return fmt.Errorf("new settlement caller: %w", err) - } + vslClient, err := parameter.InitVSLClient() + if err != nil { + return fmt.Errorf("init vsl client: %w", err) + } - epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller) - if err != nil { - return fmt.Errorf("get current epoch: %w", err) - } + chainID, err := vslClient.ChainID(context.Background()) + if err != nil { + return fmt.Errorf("get chain id: %w", err) + } - // save epoch to redis cache - err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch) - if err != nil { - return fmt.Errorf("update current epoch: %w", err) - } + networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient) + if err != nil { + return fmt.Errorf("new network params caller: %w", err) + } - // when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch - if _, err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch)); err != nil { - zap.L().Error("pull network parameters from VSL", zap.Error(err)) + settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient) + if err != nil { + return fmt.Errorf("new settlement caller: %w", err) + } - return fmt.Errorf("pull network parameters from VSL: %w", err) - } + epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller) + if err != nil { + return fmt.Errorf("get current epoch: %w", err) + } - for network, blockStart := range parameter.CurrentNetworkStartBlock { - if blockStart == nil { - continue // Skip if the start block is not defined. + // save epoch to redis cache + err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch) + if err != nil { + return fmt.Errorf("update current epoch: %w", err) } - // Convert big.Int to int64; safe as long as the value fits in int64. - blockStartInt64 := blockStart.Block.Int64() + // when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch + if _, err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch)); err != nil { + zap.L().Error("pull network parameters from VSL", zap.Error(err)) - // Update the current block start for the network in Redis. - err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64) - if err != nil { - return fmt.Errorf("update current block start: %w", err) + return fmt.Errorf("pull network parameters from VSL: %w", err) } - } - if err := tx.Commit(); err != nil { - return fmt.Errorf("commit transaction: %w", err) + for network, blockStart := range parameter.CurrentNetworkStartBlock { + if blockStart == nil { + continue // Skip if the start block is not defined. + } + + // Convert big.Int to int64; safe as long as the value fits in int64. + blockStartInt64 := blockStart.Block.Int64() + + // Update the current block start for the network in Redis. + err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64) + if err != nil { + return fmt.Errorf("update current block start: %w", err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } } } diff --git a/config/config.go b/config/config.go index 95d65f87..b5343331 100644 --- a/config/config.go +++ b/config/config.go @@ -41,7 +41,7 @@ type File struct { Component *Component `mapstructure:"component" validate:"required"` Database *Database `mapstructure:"database" validate:"required"` Stream *Stream `mapstructure:"stream"` - Redis *Redis `mapstructure:"redis"` + Redis *Redis `mapstructure:"redis" validate:"required"` Observability *Telemetry `mapstructure:"observability"` } diff --git a/internal/node/component/info/handler_worker.go b/internal/node/component/info/handler_worker.go index 7b5d2387..a6136ee5 100644 --- a/internal/node/component/info/handler_worker.go +++ b/internal/node/component/info/handler_worker.go @@ -46,52 +46,14 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error { workerCount := config.CalculateWorkerCount(c.config) workerInfoChan := make(chan *WorkerInfo, workerCount) - // Handle redis + decentralized case first - if c.redisClient != nil && len(c.config.Component.Decentralized) > 0 { - c.fetchAllWorkerInfo(ctx, workerInfoChan) - response := c.buildWorkerResponse(workerInfoChan) - - return ctx.JSON(http.StatusOK, response) - } - - // Logic for RSS and Federated - response := &WorkerResponse{ - Data: ComponentInfo{}, - } - - // Handle RSS if exists - if c.config.Component.RSS != nil { - m := c.config.Component.RSS - response.Data.RSS = &WorkerInfo{ - WorkerID: m.ID, - Network: m.Network, - Worker: m.Worker, - Tags: rss.ToTagsMap[m.Worker.(rss.Worker)], - Platform: rss.ToPlatformMap[m.Worker.(rss.Worker)].String(), - Status: worker.StatusReady, - } - } - - // Handle Federated if exists - if len(c.config.Component.Federated) > 0 { - f := c.config.Component.Federated[0] - if f.Worker == federated.Core { - response.Data.Federated = []*WorkerInfo{{ - WorkerID: f.ID, - Network: f.Network, - Worker: f.Worker, - Tags: federated.ToTagsMap[federated.Core], - Platform: federated.ToPlatformMap[federated.Core].String(), - Status: worker.StatusReady, - }} - } + if c.redisClient == nil { + return ctx.JSON(http.StatusInternalServerError, "redis client is required for monitor module") } - if response.Data.RSS != nil || len(response.Data.Federated) > 0 { - return ctx.JSON(http.StatusOK, response) - } + c.fetchAllWorkerInfo(ctx, workerInfoChan) + response := c.buildWorkerResponse(workerInfoChan) - return nil + return ctx.JSON(http.StatusOK, response) } // fetchAllWorkerInfo fetches the status of all workers concurrently. @@ -141,10 +103,15 @@ func (c *Component) fetchAllWorkerInfo(ctx echo.Context, workerInfoChan chan<- * // buildWorkerResponse builds the worker response from the worker info channel func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *WorkerResponse { response := &WorkerResponse{ - Data: ComponentInfo{ - Decentralized: []*WorkerInfo{}, - Federated: []*WorkerInfo{}, - }, + Data: ComponentInfo{}, + } + + if len(c.config.Component.Decentralized) > 0 { + response.Data.Decentralized = []*WorkerInfo{} + } + + if len(c.config.Component.Federated) > 0 { + response.Data.Federated = []*WorkerInfo{} } for workerInfo := range workerInfoChan {