From c3c5c07c7d2751d0ecaeff882d32545c647e7f96 Mon Sep 17 00:00:00 2001 From: pseudoyu Date: Fri, 1 Nov 2024 10:01:37 +0700 Subject: [PATCH 1/7] fix: `/workers_status` api --- .../node/component/info/handler_worker.go | 101 ++++++++++-------- 1 file changed, 57 insertions(+), 44 deletions(-) diff --git a/internal/node/component/info/handler_worker.go b/internal/node/component/info/handler_worker.go index d826e5a6..7b5d2387 100644 --- a/internal/node/component/info/handler_worker.go +++ b/internal/node/component/info/handler_worker.go @@ -24,9 +24,9 @@ type WorkerResponse struct { } type ComponentInfo struct { - Decentralized []*WorkerInfo `json:"decentralized"` - RSS *WorkerInfo `json:"rss"` - Federated []*WorkerInfo `json:"federated"` + Decentralized []*WorkerInfo `json:"decentralized,omitempty"` + RSS *WorkerInfo `json:"rss,omitempty"` + Federated []*WorkerInfo `json:"federated,omitempty"` } type WorkerInfo struct { @@ -46,52 +46,52 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error { workerCount := config.CalculateWorkerCount(c.config) workerInfoChan := make(chan *WorkerInfo, workerCount) - var response *WorkerResponse - - switch { - case c.redisClient != nil && len(c.config.Component.Decentralized) > 0: - // Fetch all worker info concurrently. + // Handle redis + decentralized case first + if c.redisClient != nil && len(c.config.Component.Decentralized) > 0 { c.fetchAllWorkerInfo(ctx, workerInfoChan) + response := c.buildWorkerResponse(workerInfoChan) - // Build the worker response. - response = c.buildWorkerResponse(workerInfoChan) - case c.config.Component.RSS != nil: - m := c.config.Component.RSS + return ctx.JSON(http.StatusOK, response) + } + + // Logic for RSS and Federated + response := &WorkerResponse{ + Data: ComponentInfo{}, + } - response = &WorkerResponse{ - Data: ComponentInfo{ - RSS: &WorkerInfo{ - WorkerID: m.ID, - Network: m.Network, - Worker: m.Worker, - Tags: rss.ToTagsMap[m.Worker.(rss.Worker)], - Platform: rss.ToPlatformMap[m.Worker.(rss.Worker)].String(), - Status: worker.StatusReady}, - }, + // Handle RSS if exists + if c.config.Component.RSS != nil { + m := c.config.Component.RSS + response.Data.RSS = &WorkerInfo{ + WorkerID: m.ID, + Network: m.Network, + Worker: m.Worker, + Tags: rss.ToTagsMap[m.Worker.(rss.Worker)], + Platform: rss.ToPlatformMap[m.Worker.(rss.Worker)].String(), + Status: worker.StatusReady, } - case len(c.config.Component.Federated) > 0: + } + + // Handle Federated if exists + if len(c.config.Component.Federated) > 0 { f := c.config.Component.Federated[0] - switch f.Worker { - case federated.Core: - response = &WorkerResponse{ - Data: ComponentInfo{ - RSS: &WorkerInfo{ - WorkerID: f.ID, - Network: f.Network, - Worker: f.Worker, - Tags: federated.ToTagsMap[federated.Core], - Platform: rss.ToPlatformMap[f.Worker.(rss.Worker)].String(), - Status: worker.StatusReady}, - }, - } - default: - return nil + if f.Worker == federated.Core { + response.Data.Federated = []*WorkerInfo{{ + WorkerID: f.ID, + Network: f.Network, + Worker: f.Worker, + Tags: federated.ToTagsMap[federated.Core], + Platform: federated.ToPlatformMap[federated.Core].String(), + Status: worker.StatusReady, + }} } - default: - return nil } - return ctx.JSON(http.StatusOK, response) + if response.Data.RSS != nil || len(response.Data.Federated) > 0 { + return ctx.JSON(http.StatusOK, response) + } + + return nil } // fetchAllWorkerInfo fetches the status of all workers concurrently. @@ -108,7 +108,19 @@ func (c *Component) fetchAllWorkerInfo(ctx echo.Context, workerInfoChan chan<- * }(w) } - modules := append(append(c.config.Component.Decentralized, c.config.Component.RSS), c.config.Component.Federated...) + modules := make([]*config.Module, 0, config.CalculateWorkerCount(c.config)) + + if len(c.config.Component.Decentralized) > 0 { + modules = append(modules, c.config.Component.Decentralized...) + } + + if len(c.config.Component.Federated) > 0 { + modules = append(modules, c.config.Component.Federated...) + } + + if c.config.Component.RSS != nil { + modules = append(modules, c.config.Component.RSS) + } for _, m := range modules { if m.Network.Protocol() == network.RSSProtocol { @@ -131,7 +143,6 @@ func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *Work response := &WorkerResponse{ Data: ComponentInfo{ Decentralized: []*WorkerInfo{}, - RSS: &WorkerInfo{}, Federated: []*WorkerInfo{}, }, } @@ -139,7 +150,9 @@ func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *Work for workerInfo := range workerInfoChan { switch workerInfo.Network.Protocol() { case network.RSSProtocol: - response.Data.RSS = workerInfo + if c.config.Component.RSS != nil { + response.Data.RSS = workerInfo + } case network.EthereumProtocol, network.FarcasterProtocol, network.ArweaveProtocol, network.NearProtocol: response.Data.Decentralized = append(response.Data.Decentralized, workerInfo) case network.ActivityPubProtocol: From 3e67789a52eef88b8e8a85a98bee8d77961d7bbb Mon Sep 17 00:00:00 2001 From: pseudoyu Date: Fri, 1 Nov 2024 12:27:17 +0700 Subject: [PATCH 2/7] feat: require monitor to get rsshub and federated workers --- cmd/main.go | 127 +++++++++--------- config/config.go | 2 +- .../node/component/info/handler_worker.go | 61 ++------- 3 files changed, 80 insertions(+), 110 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index b8b1d846..ec6e707d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -91,26 +91,8 @@ var command = cobra.Command{ var settlementCaller *vsl.SettlementCaller - // Apply database migrations for all modules except the broadcaster. - if module != BroadcasterArg && !config.IsRSSComponentOnly(configFile) { - databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database) - if err != nil { - return fmt.Errorf("dial database: %w", err) - } - - tx, err := databaseClient.Begin(cmd.Context()) - if err != nil { - return fmt.Errorf("begin transaction: %w", err) - } - - if err := tx.Migrate(cmd.Context()); err != nil { - err := tx.Rollback() - if err != nil { - return fmt.Errorf("rollback database: %w", err) - } - return fmt.Errorf("migrate database: %w", err) - } - + // Broadcaster does not need Redis, DB and Network Params Client + if module != BroadcasterArg { // Init a Redis client. if configFile.Redis == nil { zap.L().Error("redis configFile is missing") @@ -122,61 +104,82 @@ var command = cobra.Command{ return fmt.Errorf("new redis client: %w", err) } - vslClient, err := parameter.InitVSLClient() - if err != nil { - return fmt.Errorf("init vsl client: %w", err) - } + // DB and Network Params Client is not needed for RSS component only Node + if !config.IsRSSComponentOnly(configFile) { + databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database) + if err != nil { + return fmt.Errorf("dial database: %w", err) + } - chainID, err := vslClient.ChainID(context.Background()) - if err != nil { - return fmt.Errorf("get chain id: %w", err) - } + tx, err := databaseClient.Begin(cmd.Context()) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } - networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient) - if err != nil { - return fmt.Errorf("new network params caller: %w", err) - } + if err := tx.Migrate(cmd.Context()); err != nil { + err := tx.Rollback() + if err != nil { + return fmt.Errorf("rollback database: %w", err) + } + return fmt.Errorf("migrate database: %w", err) + } - settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient) - if err != nil { - return fmt.Errorf("new settlement caller: %w", err) - } + vslClient, err := parameter.InitVSLClient() + if err != nil { + return fmt.Errorf("init vsl client: %w", err) + } - epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller) - if err != nil { - return fmt.Errorf("get current epoch: %w", err) - } + chainID, err := vslClient.ChainID(context.Background()) + if err != nil { + return fmt.Errorf("get chain id: %w", err) + } - // save epoch to redis cache - err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch) - if err != nil { - return fmt.Errorf("update current epoch: %w", err) - } + networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient) + if err != nil { + return fmt.Errorf("new network params caller: %w", err) + } - // when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch - if _, err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch)); err != nil { - zap.L().Error("pull network parameters from VSL", zap.Error(err)) + settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient) + if err != nil { + return fmt.Errorf("new settlement caller: %w", err) + } - return fmt.Errorf("pull network parameters from VSL: %w", err) - } + epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller) + if err != nil { + return fmt.Errorf("get current epoch: %w", err) + } - for network, blockStart := range parameter.CurrentNetworkStartBlock { - if blockStart == nil { - continue // Skip if the start block is not defined. + // save epoch to redis cache + err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch) + if err != nil { + return fmt.Errorf("update current epoch: %w", err) } - // Convert big.Int to int64; safe as long as the value fits in int64. - blockStartInt64 := blockStart.Block.Int64() + // when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch + if _, err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch)); err != nil { + zap.L().Error("pull network parameters from VSL", zap.Error(err)) - // Update the current block start for the network in Redis. - err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64) - if err != nil { - return fmt.Errorf("update current block start: %w", err) + return fmt.Errorf("pull network parameters from VSL: %w", err) } - } - if err := tx.Commit(); err != nil { - return fmt.Errorf("commit transaction: %w", err) + for network, blockStart := range parameter.CurrentNetworkStartBlock { + if blockStart == nil { + continue // Skip if the start block is not defined. + } + + // Convert big.Int to int64; safe as long as the value fits in int64. + blockStartInt64 := blockStart.Block.Int64() + + // Update the current block start for the network in Redis. + err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64) + if err != nil { + return fmt.Errorf("update current block start: %w", err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } } } diff --git a/config/config.go b/config/config.go index 95d65f87..b5343331 100644 --- a/config/config.go +++ b/config/config.go @@ -41,7 +41,7 @@ type File struct { Component *Component `mapstructure:"component" validate:"required"` Database *Database `mapstructure:"database" validate:"required"` Stream *Stream `mapstructure:"stream"` - Redis *Redis `mapstructure:"redis"` + Redis *Redis `mapstructure:"redis" validate:"required"` Observability *Telemetry `mapstructure:"observability"` } diff --git a/internal/node/component/info/handler_worker.go b/internal/node/component/info/handler_worker.go index 7b5d2387..a6136ee5 100644 --- a/internal/node/component/info/handler_worker.go +++ b/internal/node/component/info/handler_worker.go @@ -46,52 +46,14 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error { workerCount := config.CalculateWorkerCount(c.config) workerInfoChan := make(chan *WorkerInfo, workerCount) - // Handle redis + decentralized case first - if c.redisClient != nil && len(c.config.Component.Decentralized) > 0 { - c.fetchAllWorkerInfo(ctx, workerInfoChan) - response := c.buildWorkerResponse(workerInfoChan) - - return ctx.JSON(http.StatusOK, response) - } - - // Logic for RSS and Federated - response := &WorkerResponse{ - Data: ComponentInfo{}, - } - - // Handle RSS if exists - if c.config.Component.RSS != nil { - m := c.config.Component.RSS - response.Data.RSS = &WorkerInfo{ - WorkerID: m.ID, - Network: m.Network, - Worker: m.Worker, - Tags: rss.ToTagsMap[m.Worker.(rss.Worker)], - Platform: rss.ToPlatformMap[m.Worker.(rss.Worker)].String(), - Status: worker.StatusReady, - } - } - - // Handle Federated if exists - if len(c.config.Component.Federated) > 0 { - f := c.config.Component.Federated[0] - if f.Worker == federated.Core { - response.Data.Federated = []*WorkerInfo{{ - WorkerID: f.ID, - Network: f.Network, - Worker: f.Worker, - Tags: federated.ToTagsMap[federated.Core], - Platform: federated.ToPlatformMap[federated.Core].String(), - Status: worker.StatusReady, - }} - } + if c.redisClient == nil { + return ctx.JSON(http.StatusInternalServerError, "redis client is required for monitor module") } - if response.Data.RSS != nil || len(response.Data.Federated) > 0 { - return ctx.JSON(http.StatusOK, response) - } + c.fetchAllWorkerInfo(ctx, workerInfoChan) + response := c.buildWorkerResponse(workerInfoChan) - return nil + return ctx.JSON(http.StatusOK, response) } // fetchAllWorkerInfo fetches the status of all workers concurrently. @@ -141,10 +103,15 @@ func (c *Component) fetchAllWorkerInfo(ctx echo.Context, workerInfoChan chan<- * // buildWorkerResponse builds the worker response from the worker info channel func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *WorkerResponse { response := &WorkerResponse{ - Data: ComponentInfo{ - Decentralized: []*WorkerInfo{}, - Federated: []*WorkerInfo{}, - }, + Data: ComponentInfo{}, + } + + if len(c.config.Component.Decentralized) > 0 { + response.Data.Decentralized = []*WorkerInfo{} + } + + if len(c.config.Component.Federated) > 0 { + response.Data.Federated = []*WorkerInfo{} } for workerInfo := range workerInfoChan { From 7045bd728f782387edcf1075216d95caa4d9aece Mon Sep 17 00:00:00 2001 From: pseudoyu Date: Tue, 5 Nov 2024 22:07:46 +0700 Subject: [PATCH 3/7] feat: remove redis dependency of rsshub and add health check --- cmd/main.go | 117 +++++++++--------- config/config.go | 2 +- internal/node/component/info/component.go | 12 +- internal/node/component/info/handler_node.go | 24 +--- .../node/component/info/handler_worker.go | 41 +++++- internal/node/indexer/server.go | 5 - internal/node/monitor/client.go | 64 ---------- internal/node/monitor/monitor.go | 22 +--- internal/node/monitor/monitor_mock.go | 31 +---- internal/node/monitor/monitor_test.go | 66 +--------- internal/node/monitor/server.go | 2 - 11 files changed, 109 insertions(+), 277 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index ec6e707d..94cf7c60 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -92,7 +92,7 @@ var command = cobra.Command{ var settlementCaller *vsl.SettlementCaller // Broadcaster does not need Redis, DB and Network Params Client - if module != BroadcasterArg { + if module != BroadcasterArg && !config.IsRSSComponentOnly(configFile) { // Init a Redis client. if configFile.Redis == nil { zap.L().Error("redis configFile is missing") @@ -104,82 +104,79 @@ var command = cobra.Command{ return fmt.Errorf("new redis client: %w", err) } - // DB and Network Params Client is not needed for RSS component only Node - if !config.IsRSSComponentOnly(configFile) { - databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database) - if err != nil { - return fmt.Errorf("dial database: %w", err) - } + databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database) + if err != nil { + return fmt.Errorf("dial database: %w", err) + } + + tx, err := databaseClient.Begin(cmd.Context()) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } - tx, err := databaseClient.Begin(cmd.Context()) + if err := tx.Migrate(cmd.Context()); err != nil { + err := tx.Rollback() if err != nil { - return fmt.Errorf("begin transaction: %w", err) + return fmt.Errorf("rollback database: %w", err) } + return fmt.Errorf("migrate database: %w", err) + } - if err := tx.Migrate(cmd.Context()); err != nil { - err := tx.Rollback() - if err != nil { - return fmt.Errorf("rollback database: %w", err) - } - return fmt.Errorf("migrate database: %w", err) - } + vslClient, err := parameter.InitVSLClient() + if err != nil { + return fmt.Errorf("init vsl client: %w", err) + } - vslClient, err := parameter.InitVSLClient() - if err != nil { - return fmt.Errorf("init vsl client: %w", err) - } + chainID, err := vslClient.ChainID(context.Background()) + if err != nil { + return fmt.Errorf("get chain id: %w", err) + } - chainID, err := vslClient.ChainID(context.Background()) - if err != nil { - return fmt.Errorf("get chain id: %w", err) - } + networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient) + if err != nil { + return fmt.Errorf("new network params caller: %w", err) + } - networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient) - if err != nil { - return fmt.Errorf("new network params caller: %w", err) - } + settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient) + if err != nil { + return fmt.Errorf("new settlement caller: %w", err) + } - settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient) - if err != nil { - return fmt.Errorf("new settlement caller: %w", err) - } + epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller) + if err != nil { + return fmt.Errorf("get current epoch: %w", err) + } - epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller) - if err != nil { - return fmt.Errorf("get current epoch: %w", err) - } + // save epoch to redis cache + err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch) + if err != nil { + return fmt.Errorf("update current epoch: %w", err) + } - // save epoch to redis cache - err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch) - if err != nil { - return fmt.Errorf("update current epoch: %w", err) - } + // when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch + if _, err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch)); err != nil { + zap.L().Error("pull network parameters from VSL", zap.Error(err)) - // when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch - if _, err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch)); err != nil { - zap.L().Error("pull network parameters from VSL", zap.Error(err)) + return fmt.Errorf("pull network parameters from VSL: %w", err) + } - return fmt.Errorf("pull network parameters from VSL: %w", err) + for network, blockStart := range parameter.CurrentNetworkStartBlock { + if blockStart == nil { + continue // Skip if the start block is not defined. } - for network, blockStart := range parameter.CurrentNetworkStartBlock { - if blockStart == nil { - continue // Skip if the start block is not defined. - } + // Convert big.Int to int64; safe as long as the value fits in int64. + blockStartInt64 := blockStart.Block.Int64() - // Convert big.Int to int64; safe as long as the value fits in int64. - blockStartInt64 := blockStart.Block.Int64() - - // Update the current block start for the network in Redis. - err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64) - if err != nil { - return fmt.Errorf("update current block start: %w", err) - } + // Update the current block start for the network in Redis. + err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64) + if err != nil { + return fmt.Errorf("update current block start: %w", err) } + } - if err := tx.Commit(); err != nil { - return fmt.Errorf("commit transaction: %w", err) - } + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) } } diff --git a/config/config.go b/config/config.go index b5343331..95d65f87 100644 --- a/config/config.go +++ b/config/config.go @@ -41,7 +41,7 @@ type File struct { Component *Component `mapstructure:"component" validate:"required"` Database *Database `mapstructure:"database" validate:"required"` Stream *Stream `mapstructure:"stream"` - Redis *Redis `mapstructure:"redis" validate:"required"` + Redis *Redis `mapstructure:"redis"` Observability *Telemetry `mapstructure:"observability"` } diff --git a/internal/node/component/info/component.go b/internal/node/component/info/component.go index 4bfbcccb..0285c806 100644 --- a/internal/node/component/info/component.go +++ b/internal/node/component/info/component.go @@ -3,7 +3,6 @@ package info import ( "context" "fmt" - "net/http" "github.com/labstack/echo/v4" "github.com/redis/rueidis" @@ -12,6 +11,7 @@ import ( "github.com/rss3-network/node/internal/database" "github.com/rss3-network/node/internal/node/component" "github.com/rss3-network/node/provider/ethereum/contract/vsl" + "github.com/rss3-network/node/provider/httpx" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -24,7 +24,7 @@ type Component struct { databaseClient database.Client redisClient rueidis.Client networkParamsCaller *vsl.NetworkParamsCaller - httpClient *http.Client + httpClient httpx.Client } const Name = "info" @@ -36,12 +36,18 @@ func (c *Component) Name() string { var _ component.Component = (*Component)(nil) func NewComponent(_ context.Context, apiServer *echo.Echo, config *config.File, databaseClient database.Client, redisClient rueidis.Client, networkParamsCaller *vsl.NetworkParamsCaller) component.Component { + // Initialize curve registry. + httpxClient, err := httpx.NewHTTPClient() + if err != nil { + return nil + } + c := &Component{ config: config, databaseClient: databaseClient, redisClient: redisClient, networkParamsCaller: networkParamsCaller, - httpClient: http.DefaultClient, + httpClient: httpxClient, } apiServer.GET("/", c.GetNodeOperator) diff --git a/internal/node/component/info/handler_node.go b/internal/node/component/info/handler_node.go index 3819259f..bc260271 100644 --- a/internal/node/component/info/handler_node.go +++ b/internal/node/component/info/handler_node.go @@ -300,32 +300,16 @@ func (c *Component) sendRequest(ctx context.Context, path string, result any) er internalURL.Path = path - req, err := http.NewRequestWithContext(ctx, http.MethodGet, internalURL.String(), nil) + body, err := c.httpClient.Fetch(ctx, internalURL.String()) if err != nil { - return fmt.Errorf("new request: %w", err) + return fmt.Errorf("fetch request: %w", err) } + defer body.Close() - req.Header.Set("Content-Type", "application/json") - - resp, err := c.httpClient.Do(req) - if err != nil { - return fmt.Errorf("do request: %w", err) - } - - defer func() { - _ = resp.Body.Close() - }() - - if err = json.NewDecoder(resp.Body).Decode(&result); err != nil { + if err = json.NewDecoder(body).Decode(&result); err != nil { return fmt.Errorf("decode response: %w", err) } - if resp.StatusCode != http.StatusOK { - marshal, _ := json.Marshal(result) - - return fmt.Errorf("unexpected status: %s, response: %s", resp.Status, string(marshal)) - } - return nil } diff --git a/internal/node/component/info/handler_worker.go b/internal/node/component/info/handler_worker.go index a6136ee5..015a31ce 100644 --- a/internal/node/component/info/handler_worker.go +++ b/internal/node/component/info/handler_worker.go @@ -4,7 +4,10 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" + "net/url" + "path" "sync" "github.com/labstack/echo/v4" @@ -46,11 +49,9 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error { workerCount := config.CalculateWorkerCount(c.config) workerInfoChan := make(chan *WorkerInfo, workerCount) - if c.redisClient == nil { - return ctx.JSON(http.StatusInternalServerError, "redis client is required for monitor module") - } - + // Fetch the status of all workers concurrently c.fetchAllWorkerInfo(ctx, workerInfoChan) + response := c.buildWorkerResponse(workerInfoChan) return ctx.JSON(http.StatusOK, response) @@ -142,8 +143,36 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module) } } - // Fetch status and progress from a specific worker by id. - status, workerProgress := c.getWorkerStatusAndProgressByID(ctx, module.ID) + var ( + status worker.Status + workerProgress monitor.WorkerProgress + ) + + status = worker.StatusUnhealthy + + if module.Network.Protocol() == network.RSSProtocol { + // Check RSS worker health status + baseURL, err := url.Parse(module.EndpointID) + if err != nil { + zap.L().Error("invalid RSS endpoint", zap.String("endpoint", module.EndpointID)) + } + + baseURL.Path = path.Join(baseURL.Path, "healthz") + + var body io.ReadCloser + body, err = c.httpClient.Fetch(ctx, baseURL.String()) + + if err != nil { + zap.L().Error("fetch RSS healthz", zap.String("endpoint", baseURL.String()), zap.Error(err)) + } else { + defer body.Close() + + status = worker.StatusReady + } + } else { + // Fetch status and progress from a specific worker by id. + status, workerProgress = c.getWorkerStatusAndProgressByID(ctx, module.ID) + } workerInfo := &WorkerInfo{ WorkerID: module.ID, diff --git a/internal/node/indexer/server.go b/internal/node/indexer/server.go index 9ab1cba7..314e28d0 100644 --- a/internal/node/indexer/server.go +++ b/internal/node/indexer/server.go @@ -326,11 +326,6 @@ func NewServer(ctx context.Context, config *config.Module, databaseClient databa if err != nil { return nil, fmt.Errorf("new near monitorClient: %w", err) } - case network.RSSProtocol: - instance.monitorClient, err = monitor.NewRssClient(config.EndpointID, config.Parameters) - if err != nil { - return nil, fmt.Errorf("new rss monitorClient: %w", err) - } } if err := instance.initializeMeter(); err != nil { diff --git a/internal/node/monitor/client.go b/internal/node/monitor/client.go index 003de0c3..36ba59b7 100644 --- a/internal/node/monitor/client.go +++ b/internal/node/monitor/client.go @@ -4,18 +4,14 @@ import ( "context" "errors" "fmt" - "net/url" - "path" "strconv" "time" "github.com/rss3-network/node/config" - "github.com/rss3-network/node/internal/node/component/rss" "github.com/rss3-network/node/provider/activitypub/mastodon" "github.com/rss3-network/node/provider/arweave" "github.com/rss3-network/node/provider/ethereum" "github.com/rss3-network/node/provider/farcaster" - "github.com/rss3-network/node/provider/httpx" "github.com/rss3-network/node/provider/near" "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/federated" @@ -211,66 +207,6 @@ func NewFarcasterClient() (Client, error) { return &farcasterClient{}, nil } -// rssClient is a client implementation for rss. -type rssClient struct { - httpClient httpx.Client - url string -} - -// make sure client implements Client -var _ Client = (*rssClient)(nil) - -func (c *rssClient) CurrentState(_ CheckpointState) (uint64, uint64) { - return 0, 0 -} - -func (c *rssClient) TargetState(_ *config.Parameters) (uint64, uint64) { - return 0, 0 -} - -// LatestState requests a URL to check if it can be accessed correctly. -func (c *rssClient) LatestState(ctx context.Context) (uint64, uint64, error) { - _, err := c.httpClient.Fetch(ctx, c.url) - - if err != nil { - return 0, 0, err - } - - return 0, 0, nil -} - -// NewRssClient returns a new rss client. -func NewRssClient(endpoint string, param *config.Parameters) (Client, error) { - base, err := url.Parse(endpoint) - if err != nil { - return nil, fmt.Errorf("parse RSSHub endpoint: %w", err) - } - - // used for health checks - base.Path = path.Join(base.Path, "healthz") - - option, err := rss.NewOption(param) - if err != nil { - return nil, fmt.Errorf("parse config parameters: %w", err) - } - - if option.Authentication.AccessKey != "" { - query := base.Query() - query.Set("key", option.Authentication.AccessKey) - base.RawQuery = query.Encode() - } - - httpClient, err := httpx.NewHTTPClient() - if err != nil { - return nil, err - } - - return &rssClient{ - httpClient: httpClient, - url: base.String(), - }, nil -} - func (c *activitypubClient) CurrentState(_ CheckpointState) (uint64, uint64) { return 0, 0 } diff --git a/internal/node/monitor/monitor.go b/internal/node/monitor/monitor.go index 217fa7f3..6f2bec3e 100644 --- a/internal/node/monitor/monitor.go +++ b/internal/node/monitor/monitor.go @@ -12,7 +12,6 @@ import ( workerx "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/decentralized" "github.com/rss3-network/protocol-go/schema/network" - "github.com/samber/lo" "go.uber.org/zap" ) @@ -36,7 +35,7 @@ type WorkerProgress struct { func (m *Monitor) MonitorWorkerStatus(ctx context.Context) error { var wg sync.WaitGroup - errChan := make(chan error, len(m.config.Component.Decentralized)+lo.Ternary(m.config.Component.RSS != nil, 1, 0)+len(m.config.Component.Federated)) + errChan := make(chan error, len(m.config.Component.Decentralized)+len(m.config.Component.Federated)) processWorker := func(w *config.Module, processFunc func(context.Context, *config.Module) error) { wg.Add(1) @@ -54,10 +53,6 @@ func (m *Monitor) MonitorWorkerStatus(ctx context.Context) error { processWorker(w, m.processDecentralizedWorker) } - if m.config.Component.RSS != nil { - processWorker(m.config.Component.RSS, m.processRSSWorker) - } - if m.config.Component.Federated != nil { for _, federatedComponent := range m.config.Component.Federated { processWorker(federatedComponent, m.processFederatedWorker) @@ -159,21 +154,6 @@ func (m *Monitor) processFederatedWorker(ctx context.Context, w *config.Module) return m.UpdateWorkerStatusByID(ctx, w.ID, targetStatus.String()) } -// processRSSWorker processes the rss worker status. -func (m *Monitor) processRSSWorker(ctx context.Context, w *config.Module) error { - client, ok := m.clients[w.Network] - if !ok { - return fmt.Errorf("client not exist") - } - - targetStatus := workerx.StatusReady - if _, _, err := client.LatestState(ctx); err != nil { - targetStatus = workerx.StatusUnhealthy - } - - return m.UpdateWorkerStatusByID(ctx, w.ID, targetStatus.String()) -} - // getWorkerIndexingStateByClients gets the latest block height (arweave), block number (ethereum), event id (farcaster). func (m *Monitor) getWorkerIndexingStateByClients(ctx context.Context, n network.Network, w string, state CheckpointState, param *config.Parameters) (uint64, uint64, uint64, error) { client, ok := m.clients[n] diff --git a/internal/node/monitor/monitor_mock.go b/internal/node/monitor/monitor_mock.go index 94099974..4af35884 100644 --- a/internal/node/monitor/monitor_mock.go +++ b/internal/node/monitor/monitor_mock.go @@ -7,16 +7,14 @@ import ( "github.com/rss3-network/node/config" "github.com/rss3-network/node/config/parameter" - workerx "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/decentralized" - "github.com/samber/lo" "go.uber.org/zap" ) func (m *Monitor) MonitorMockWorkerStatus(ctx context.Context, currentState CheckpointState, targetWorkerState, latestState uint64) error { var wg sync.WaitGroup - errChan := make(chan error, len(m.config.Component.Decentralized)+lo.Ternary(m.config.Component.RSS != nil, 1, 0)) + errChan := make(chan error, len(m.config.Component.Decentralized)+len(m.config.Component.Federated)) for _, w := range m.config.Component.Decentralized { wg.Add(1) @@ -30,18 +28,6 @@ func (m *Monitor) MonitorMockWorkerStatus(ctx context.Context, currentState Chec }(w) } - if m.config.Component.RSS != nil { - wg.Add(1) - - go func(w *config.Module) { - defer wg.Done() - - if err := m.processMockRSSWorker(ctx, w); err != nil { - errChan <- err - } - }(m.config.Component.RSS) - } - go func() { wg.Wait() close(errChan) @@ -83,18 +69,3 @@ func (m *Monitor) processMockWorker(ctx context.Context, w *config.Module, curre return nil } - -// processMockRSSWorker processes the rss worker status. -func (m *Monitor) processMockRSSWorker(ctx context.Context, w *config.Module) error { - client, ok := m.clients[w.Network] - if !ok { - return fmt.Errorf("client not exist") - } - - targetStatus := workerx.StatusReady - if _, _, err := client.LatestState(ctx); err != nil { - targetStatus = workerx.StatusUnhealthy - } - - return m.UpdateWorkerStatusByID(ctx, w.ID, targetStatus.String()) -} diff --git a/internal/node/monitor/monitor_test.go b/internal/node/monitor/monitor_test.go index be7a2855..f720a261 100644 --- a/internal/node/monitor/monitor_test.go +++ b/internal/node/monitor/monitor_test.go @@ -13,7 +13,6 @@ import ( redisx "github.com/rss3-network/node/provider/redis" "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/decentralized" - "github.com/rss3-network/node/schema/worker/rss" "github.com/rss3-network/protocol-go/schema/network" "github.com/stretchr/testify/require" ) @@ -931,50 +930,6 @@ func TestMonitor(t *testing.T) { want: worker.StatusIndexing, wantError: require.NoError, }, - - // RSS rsshub - { - name: "RSSHub Worker Ready Status -> Unhealthy Status", - source: network.RSSProtocol, - arguments: arguments{ - config: &config.File{ - Component: &config.Component{ - RSS: &config.Module{ - ID: "rss-rsshub", - Network: network.RSSHub, - Worker: rss.Core, - EndpointID: "https://rsshub3.bruce.com", - }, - }, - }, - currentState: monitor.CheckpointState{}, - latestState: 0, - initialStatus: worker.StatusReady, - }, - want: worker.StatusUnhealthy, - wantError: require.NoError, - }, - { - name: "Rsshub Worker Ready Status -> Ready Status", - source: network.RSSProtocol, - arguments: arguments{ - config: &config.File{ - Component: &config.Component{ - RSS: &config.Module{ - ID: "rss-rsshub", - Network: network.RSSHub, - Worker: rss.Core, - EndpointID: "https://rsshub.app", - }, - }, - }, - currentState: monitor.CheckpointState{}, - latestState: 0, - initialStatus: worker.StatusReady, - }, - want: worker.StatusReady, - wantError: require.NoError, - }, } // Start Redis container @@ -1001,7 +956,7 @@ func TestMonitor(t *testing.T) { testcase := testcase switch testcase.source { - case network.FarcasterProtocol, network.ArweaveProtocol, network.EthereumProtocol, network.NearProtocol: + case network.FarcasterProtocol, network.ArweaveProtocol, network.EthereumProtocol, network.NearProtocol, network.ActivityPubProtocol: t.Run(testcase.name, func(t *testing.T) { ctx := context.Background() @@ -1024,25 +979,6 @@ func TestMonitor(t *testing.T) { status := instance.GetWorkerStatusByID(ctx, testcase.arguments.config.Component.Decentralized[0].ID) require.Equal(t, testcase.want, status) }) - case network.RSSProtocol: - t.Run(testcase.name, func(t *testing.T) { - ctx := context.Background() - - instance, err := monitor.NewMonitor(ctx, testcase.arguments.config, nil, redisClient, nil, nil) - require.NoError(t, err) - - // update worker status to initial status - err = instance.UpdateWorkerStatusByID(ctx, testcase.arguments.config.Component.RSS.ID, testcase.arguments.initialStatus.String()) - require.NoError(t, err) - - // run monitor - err = instance.MonitorMockWorkerStatus(ctx, testcase.arguments.currentState, testcase.arguments.targetState, testcase.arguments.latestState) - require.NoError(t, err) - - // check final worker status - status := instance.GetWorkerStatusByID(ctx, testcase.arguments.config.Component.RSS.ID) - require.Equal(t, testcase.want, status) - }) default: } } diff --git a/internal/node/monitor/server.go b/internal/node/monitor/server.go index 95afbaf7..a5d52026 100644 --- a/internal/node/monitor/server.go +++ b/internal/node/monitor/server.go @@ -97,8 +97,6 @@ func initNetworkClient(m *config.Module) (Client, error) { client, err = NewArweaveClient() case network.FarcasterProtocol: client, err = NewFarcasterClient() - case network.RSSProtocol: - client, err = NewRssClient(m.EndpointID, m.Parameters) case network.EthereumProtocol: client, err = NewEthereumClient(m.Endpoint) case network.NearProtocol: From 2669b46bc9fc7c2ec5c560af5cea73f414116222 Mon Sep 17 00:00:00 2001 From: pseudoyu Date: Tue, 5 Nov 2024 22:13:14 +0700 Subject: [PATCH 4/7] feat: also use access key support --- internal/node/component/info/handler_worker.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/internal/node/component/info/handler_worker.go b/internal/node/component/info/handler_worker.go index 015a31ce..3c87b812 100644 --- a/internal/node/component/info/handler_worker.go +++ b/internal/node/component/info/handler_worker.go @@ -12,6 +12,7 @@ import ( "github.com/labstack/echo/v4" "github.com/rss3-network/node/config" + rssx "github.com/rss3-network/node/internal/node/component/rss" "github.com/rss3-network/node/internal/node/monitor" "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/decentralized" @@ -159,6 +160,16 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module) baseURL.Path = path.Join(baseURL.Path, "healthz") + // Parse RSS options from module parameters + option, err := rssx.NewOption(module.Parameters) + if err != nil { + zap.L().Error("parse config parameters", zap.Error(err)) + } else if option.Authentication.AccessKey != "" { + query := baseURL.Query() + query.Set("key", option.Authentication.AccessKey) + baseURL.RawQuery = query.Encode() + } + var body io.ReadCloser body, err = c.httpClient.Fetch(ctx, baseURL.String()) From d65ebc412364db5564d075c7f4c0fb0a74362093 Mon Sep 17 00:00:00 2001 From: pseudoyu Date: Wed, 6 Nov 2024 07:44:40 +0700 Subject: [PATCH 5/7] refactor: extract rss health check logic to seperate func --- internal/node/component/info/component.go | 1 - .../node/component/info/handler_worker.go | 64 ++++++++++--------- 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/internal/node/component/info/component.go b/internal/node/component/info/component.go index 0285c806..1621b0f6 100644 --- a/internal/node/component/info/component.go +++ b/internal/node/component/info/component.go @@ -36,7 +36,6 @@ func (c *Component) Name() string { var _ component.Component = (*Component)(nil) func NewComponent(_ context.Context, apiServer *echo.Echo, config *config.File, databaseClient database.Client, redisClient rueidis.Client, networkParamsCaller *vsl.NetworkParamsCaller) component.Component { - // Initialize curve registry. httpxClient, err := httpx.NewHTTPClient() if err != nil { return nil diff --git a/internal/node/component/info/handler_worker.go b/internal/node/component/info/handler_worker.go index 3c87b812..83fce497 100644 --- a/internal/node/component/info/handler_worker.go +++ b/internal/node/component/info/handler_worker.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "net/http" "net/url" "path" @@ -153,35 +152,9 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module) if module.Network.Protocol() == network.RSSProtocol { // Check RSS worker health status - baseURL, err := url.Parse(module.EndpointID) - if err != nil { - zap.L().Error("invalid RSS endpoint", zap.String("endpoint", module.EndpointID)) - } - - baseURL.Path = path.Join(baseURL.Path, "healthz") - - // Parse RSS options from module parameters - option, err := rssx.NewOption(module.Parameters) - if err != nil { - zap.L().Error("parse config parameters", zap.Error(err)) - } else if option.Authentication.AccessKey != "" { - query := baseURL.Query() - query.Set("key", option.Authentication.AccessKey) - baseURL.RawQuery = query.Encode() - } - - var body io.ReadCloser - body, err = c.httpClient.Fetch(ctx, baseURL.String()) - - if err != nil { - zap.L().Error("fetch RSS healthz", zap.String("endpoint", baseURL.String()), zap.Error(err)) - } else { - defer body.Close() - - status = worker.StatusReady - } + status, _ = c.checkRSSWorkerHealth(ctx, module) } else { - // Fetch status and progress from a specific worker by id. + // Fetch decentralized or federated worker status and progress from a specific worker by id. status, workerProgress = c.getWorkerStatusAndProgressByID(ctx, module.ID) } @@ -231,6 +204,39 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module) return workerInfo } +// checkRSSWorkerHealth checks the health of the RSS worker by `healthz` api. +func (c *Component) checkRSSWorkerHealth(ctx context.Context, module *config.Module) (worker.Status, error) { + baseURL, err := url.Parse(module.EndpointID) + if err != nil { + zap.L().Error("invalid RSS endpoint", zap.String("endpoint", module.EndpointID)) + return worker.StatusUnhealthy, err + } + + baseURL.Path = path.Join(baseURL.Path, "healthz") + + // Parse RSS options from module parameters + option, err := rssx.NewOption(module.Parameters) + if err != nil { + zap.L().Error("parse config parameters", zap.Error(err)) + return worker.StatusUnhealthy, err + } + + if option.Authentication.AccessKey != "" { + query := baseURL.Query() + query.Set("key", option.Authentication.AccessKey) + baseURL.RawQuery = query.Encode() + } + + body, err := c.httpClient.Fetch(ctx, baseURL.String()) + if err != nil { + zap.L().Error("fetch RSS healthz", zap.String("endpoint", baseURL.String()), zap.Error(err)) + return worker.StatusUnhealthy, err + } + defer body.Close() + + return worker.StatusReady, nil +} + // getWorkerStatusAndProgressByID gets both worker status and progress from Redis cache by worker ID. func (c *Component) getWorkerStatusAndProgressByID(ctx context.Context, workerID string) (worker.Status, monitor.WorkerProgress) { if c.redisClient == nil { From 8b60d2632392e954d4f13e1b75aac8f2f6506fd6 Mon Sep 17 00:00:00 2001 From: pseudoyu Date: Wed, 6 Nov 2024 07:46:40 +0700 Subject: [PATCH 6/7] chore: add more comments --- cmd/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/main.go b/cmd/main.go index 94cf7c60..b0e7de88 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -91,7 +91,7 @@ var command = cobra.Command{ var settlementCaller *vsl.SettlementCaller - // Broadcaster does not need Redis, DB and Network Params Client + // Broadcaster and RSS Only Node does not need Redis, DB and Network Params Client if module != BroadcasterArg && !config.IsRSSComponentOnly(configFile) { // Init a Redis client. if configFile.Redis == nil { From e92afc6652ed78b365e39769d5157b55ee8dab49 Mon Sep 17 00:00:00 2001 From: pseudoyu Date: Wed, 6 Nov 2024 07:49:50 +0700 Subject: [PATCH 7/7] fix: lint error --- internal/node/component/info/handler_worker.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/node/component/info/handler_worker.go b/internal/node/component/info/handler_worker.go index 83fce497..6886303e 100644 --- a/internal/node/component/info/handler_worker.go +++ b/internal/node/component/info/handler_worker.go @@ -148,8 +148,6 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module) workerProgress monitor.WorkerProgress ) - status = worker.StatusUnhealthy - if module.Network.Protocol() == network.RSSProtocol { // Check RSS worker health status status, _ = c.checkRSSWorkerHealth(ctx, module)