diff --git a/cmd/main.go b/cmd/main.go index b8b1d846..b0e7de88 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -91,8 +91,19 @@ var command = cobra.Command{ var settlementCaller *vsl.SettlementCaller - // Apply database migrations for all modules except the broadcaster. + // Broadcaster and RSS Only Node does not need Redis, DB and Network Params Client if module != BroadcasterArg && !config.IsRSSComponentOnly(configFile) { + // Init a Redis client. + if configFile.Redis == nil { + zap.L().Error("redis configFile is missing") + return fmt.Errorf("redis configFile is missing") + } + + redisClient, err = redis.NewClient(*configFile.Redis) + if err != nil { + return fmt.Errorf("new redis client: %w", err) + } + databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database) if err != nil { return fmt.Errorf("dial database: %w", err) @@ -111,17 +122,6 @@ var command = cobra.Command{ return fmt.Errorf("migrate database: %w", err) } - // Init a Redis client. - if configFile.Redis == nil { - zap.L().Error("redis configFile is missing") - return fmt.Errorf("redis configFile is missing") - } - - redisClient, err = redis.NewClient(*configFile.Redis) - if err != nil { - return fmt.Errorf("new redis client: %w", err) - } - vslClient, err := parameter.InitVSLClient() if err != nil { return fmt.Errorf("init vsl client: %w", err) diff --git a/internal/node/component/info/component.go b/internal/node/component/info/component.go index 4bfbcccb..1621b0f6 100644 --- a/internal/node/component/info/component.go +++ b/internal/node/component/info/component.go @@ -3,7 +3,6 @@ package info import ( "context" "fmt" - "net/http" "github.com/labstack/echo/v4" "github.com/redis/rueidis" @@ -12,6 +11,7 @@ import ( "github.com/rss3-network/node/internal/database" "github.com/rss3-network/node/internal/node/component" "github.com/rss3-network/node/provider/ethereum/contract/vsl" + "github.com/rss3-network/node/provider/httpx" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -24,7 +24,7 @@ type Component struct { databaseClient database.Client redisClient rueidis.Client networkParamsCaller *vsl.NetworkParamsCaller - httpClient *http.Client + httpClient httpx.Client } const Name = "info" @@ -36,12 +36,17 @@ func (c *Component) Name() string { var _ component.Component = (*Component)(nil) func NewComponent(_ context.Context, apiServer *echo.Echo, config *config.File, databaseClient database.Client, redisClient rueidis.Client, networkParamsCaller *vsl.NetworkParamsCaller) component.Component { + httpxClient, err := httpx.NewHTTPClient() + if err != nil { + return nil + } + c := &Component{ config: config, databaseClient: databaseClient, redisClient: redisClient, networkParamsCaller: networkParamsCaller, - httpClient: http.DefaultClient, + httpClient: httpxClient, } apiServer.GET("/", c.GetNodeOperator) diff --git a/internal/node/component/info/handler_node.go b/internal/node/component/info/handler_node.go index 3819259f..bc260271 100644 --- a/internal/node/component/info/handler_node.go +++ b/internal/node/component/info/handler_node.go @@ -300,32 +300,16 @@ func (c *Component) sendRequest(ctx context.Context, path string, result any) er internalURL.Path = path - req, err := http.NewRequestWithContext(ctx, http.MethodGet, internalURL.String(), nil) + body, err := c.httpClient.Fetch(ctx, internalURL.String()) if err != nil { - return fmt.Errorf("new request: %w", err) + return fmt.Errorf("fetch request: %w", err) } + defer body.Close() - req.Header.Set("Content-Type", "application/json") - - resp, err := c.httpClient.Do(req) - if err != nil { - return fmt.Errorf("do request: %w", err) - } - - defer func() { - _ = resp.Body.Close() - }() - - if err = json.NewDecoder(resp.Body).Decode(&result); err != nil { + if err = json.NewDecoder(body).Decode(&result); err != nil { return fmt.Errorf("decode response: %w", err) } - if resp.StatusCode != http.StatusOK { - marshal, _ := json.Marshal(result) - - return fmt.Errorf("unexpected status: %s, response: %s", resp.Status, string(marshal)) - } - return nil } diff --git a/internal/node/component/info/handler_worker.go b/internal/node/component/info/handler_worker.go index d826e5a6..6886303e 100644 --- a/internal/node/component/info/handler_worker.go +++ b/internal/node/component/info/handler_worker.go @@ -5,10 +5,13 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" + "path" "sync" "github.com/labstack/echo/v4" "github.com/rss3-network/node/config" + rssx "github.com/rss3-network/node/internal/node/component/rss" "github.com/rss3-network/node/internal/node/monitor" "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/decentralized" @@ -24,9 +27,9 @@ type WorkerResponse struct { } type ComponentInfo struct { - Decentralized []*WorkerInfo `json:"decentralized"` - RSS *WorkerInfo `json:"rss"` - Federated []*WorkerInfo `json:"federated"` + Decentralized []*WorkerInfo `json:"decentralized,omitempty"` + RSS *WorkerInfo `json:"rss,omitempty"` + Federated []*WorkerInfo `json:"federated,omitempty"` } type WorkerInfo struct { @@ -46,50 +49,10 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error { workerCount := config.CalculateWorkerCount(c.config) workerInfoChan := make(chan *WorkerInfo, workerCount) - var response *WorkerResponse - - switch { - case c.redisClient != nil && len(c.config.Component.Decentralized) > 0: - // Fetch all worker info concurrently. - c.fetchAllWorkerInfo(ctx, workerInfoChan) - - // Build the worker response. - response = c.buildWorkerResponse(workerInfoChan) - case c.config.Component.RSS != nil: - m := c.config.Component.RSS - - response = &WorkerResponse{ - Data: ComponentInfo{ - RSS: &WorkerInfo{ - WorkerID: m.ID, - Network: m.Network, - Worker: m.Worker, - Tags: rss.ToTagsMap[m.Worker.(rss.Worker)], - Platform: rss.ToPlatformMap[m.Worker.(rss.Worker)].String(), - Status: worker.StatusReady}, - }, - } - case len(c.config.Component.Federated) > 0: - f := c.config.Component.Federated[0] - switch f.Worker { - case federated.Core: - response = &WorkerResponse{ - Data: ComponentInfo{ - RSS: &WorkerInfo{ - WorkerID: f.ID, - Network: f.Network, - Worker: f.Worker, - Tags: federated.ToTagsMap[federated.Core], - Platform: rss.ToPlatformMap[f.Worker.(rss.Worker)].String(), - Status: worker.StatusReady}, - }, - } - default: - return nil - } - default: - return nil - } + // Fetch the status of all workers concurrently + c.fetchAllWorkerInfo(ctx, workerInfoChan) + + response := c.buildWorkerResponse(workerInfoChan) return ctx.JSON(http.StatusOK, response) } @@ -108,7 +71,19 @@ func (c *Component) fetchAllWorkerInfo(ctx echo.Context, workerInfoChan chan<- * }(w) } - modules := append(append(c.config.Component.Decentralized, c.config.Component.RSS), c.config.Component.Federated...) + modules := make([]*config.Module, 0, config.CalculateWorkerCount(c.config)) + + if len(c.config.Component.Decentralized) > 0 { + modules = append(modules, c.config.Component.Decentralized...) + } + + if len(c.config.Component.Federated) > 0 { + modules = append(modules, c.config.Component.Federated...) + } + + if c.config.Component.RSS != nil { + modules = append(modules, c.config.Component.RSS) + } for _, m := range modules { if m.Network.Protocol() == network.RSSProtocol { @@ -129,17 +104,23 @@ func (c *Component) fetchAllWorkerInfo(ctx echo.Context, workerInfoChan chan<- * // buildWorkerResponse builds the worker response from the worker info channel func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *WorkerResponse { response := &WorkerResponse{ - Data: ComponentInfo{ - Decentralized: []*WorkerInfo{}, - RSS: &WorkerInfo{}, - Federated: []*WorkerInfo{}, - }, + Data: ComponentInfo{}, + } + + if len(c.config.Component.Decentralized) > 0 { + response.Data.Decentralized = []*WorkerInfo{} + } + + if len(c.config.Component.Federated) > 0 { + response.Data.Federated = []*WorkerInfo{} } for workerInfo := range workerInfoChan { switch workerInfo.Network.Protocol() { case network.RSSProtocol: - response.Data.RSS = workerInfo + if c.config.Component.RSS != nil { + response.Data.RSS = workerInfo + } case network.EthereumProtocol, network.FarcasterProtocol, network.ArweaveProtocol, network.NearProtocol: response.Data.Decentralized = append(response.Data.Decentralized, workerInfo) case network.ActivityPubProtocol: @@ -162,8 +143,18 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module) } } - // Fetch status and progress from a specific worker by id. - status, workerProgress := c.getWorkerStatusAndProgressByID(ctx, module.ID) + var ( + status worker.Status + workerProgress monitor.WorkerProgress + ) + + if module.Network.Protocol() == network.RSSProtocol { + // Check RSS worker health status + status, _ = c.checkRSSWorkerHealth(ctx, module) + } else { + // Fetch decentralized or federated worker status and progress from a specific worker by id. + status, workerProgress = c.getWorkerStatusAndProgressByID(ctx, module.ID) + } workerInfo := &WorkerInfo{ WorkerID: module.ID, @@ -211,6 +202,39 @@ func (c *Component) fetchWorkerInfo(ctx context.Context, module *config.Module) return workerInfo } +// checkRSSWorkerHealth checks the health of the RSS worker by `healthz` api. +func (c *Component) checkRSSWorkerHealth(ctx context.Context, module *config.Module) (worker.Status, error) { + baseURL, err := url.Parse(module.EndpointID) + if err != nil { + zap.L().Error("invalid RSS endpoint", zap.String("endpoint", module.EndpointID)) + return worker.StatusUnhealthy, err + } + + baseURL.Path = path.Join(baseURL.Path, "healthz") + + // Parse RSS options from module parameters + option, err := rssx.NewOption(module.Parameters) + if err != nil { + zap.L().Error("parse config parameters", zap.Error(err)) + return worker.StatusUnhealthy, err + } + + if option.Authentication.AccessKey != "" { + query := baseURL.Query() + query.Set("key", option.Authentication.AccessKey) + baseURL.RawQuery = query.Encode() + } + + body, err := c.httpClient.Fetch(ctx, baseURL.String()) + if err != nil { + zap.L().Error("fetch RSS healthz", zap.String("endpoint", baseURL.String()), zap.Error(err)) + return worker.StatusUnhealthy, err + } + defer body.Close() + + return worker.StatusReady, nil +} + // getWorkerStatusAndProgressByID gets both worker status and progress from Redis cache by worker ID. func (c *Component) getWorkerStatusAndProgressByID(ctx context.Context, workerID string) (worker.Status, monitor.WorkerProgress) { if c.redisClient == nil { diff --git a/internal/node/indexer/server.go b/internal/node/indexer/server.go index 9ab1cba7..314e28d0 100644 --- a/internal/node/indexer/server.go +++ b/internal/node/indexer/server.go @@ -326,11 +326,6 @@ func NewServer(ctx context.Context, config *config.Module, databaseClient databa if err != nil { return nil, fmt.Errorf("new near monitorClient: %w", err) } - case network.RSSProtocol: - instance.monitorClient, err = monitor.NewRssClient(config.EndpointID, config.Parameters) - if err != nil { - return nil, fmt.Errorf("new rss monitorClient: %w", err) - } } if err := instance.initializeMeter(); err != nil { diff --git a/internal/node/monitor/client.go b/internal/node/monitor/client.go index 003de0c3..36ba59b7 100644 --- a/internal/node/monitor/client.go +++ b/internal/node/monitor/client.go @@ -4,18 +4,14 @@ import ( "context" "errors" "fmt" - "net/url" - "path" "strconv" "time" "github.com/rss3-network/node/config" - "github.com/rss3-network/node/internal/node/component/rss" "github.com/rss3-network/node/provider/activitypub/mastodon" "github.com/rss3-network/node/provider/arweave" "github.com/rss3-network/node/provider/ethereum" "github.com/rss3-network/node/provider/farcaster" - "github.com/rss3-network/node/provider/httpx" "github.com/rss3-network/node/provider/near" "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/federated" @@ -211,66 +207,6 @@ func NewFarcasterClient() (Client, error) { return &farcasterClient{}, nil } -// rssClient is a client implementation for rss. -type rssClient struct { - httpClient httpx.Client - url string -} - -// make sure client implements Client -var _ Client = (*rssClient)(nil) - -func (c *rssClient) CurrentState(_ CheckpointState) (uint64, uint64) { - return 0, 0 -} - -func (c *rssClient) TargetState(_ *config.Parameters) (uint64, uint64) { - return 0, 0 -} - -// LatestState requests a URL to check if it can be accessed correctly. -func (c *rssClient) LatestState(ctx context.Context) (uint64, uint64, error) { - _, err := c.httpClient.Fetch(ctx, c.url) - - if err != nil { - return 0, 0, err - } - - return 0, 0, nil -} - -// NewRssClient returns a new rss client. -func NewRssClient(endpoint string, param *config.Parameters) (Client, error) { - base, err := url.Parse(endpoint) - if err != nil { - return nil, fmt.Errorf("parse RSSHub endpoint: %w", err) - } - - // used for health checks - base.Path = path.Join(base.Path, "healthz") - - option, err := rss.NewOption(param) - if err != nil { - return nil, fmt.Errorf("parse config parameters: %w", err) - } - - if option.Authentication.AccessKey != "" { - query := base.Query() - query.Set("key", option.Authentication.AccessKey) - base.RawQuery = query.Encode() - } - - httpClient, err := httpx.NewHTTPClient() - if err != nil { - return nil, err - } - - return &rssClient{ - httpClient: httpClient, - url: base.String(), - }, nil -} - func (c *activitypubClient) CurrentState(_ CheckpointState) (uint64, uint64) { return 0, 0 } diff --git a/internal/node/monitor/monitor.go b/internal/node/monitor/monitor.go index 217fa7f3..6f2bec3e 100644 --- a/internal/node/monitor/monitor.go +++ b/internal/node/monitor/monitor.go @@ -12,7 +12,6 @@ import ( workerx "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/decentralized" "github.com/rss3-network/protocol-go/schema/network" - "github.com/samber/lo" "go.uber.org/zap" ) @@ -36,7 +35,7 @@ type WorkerProgress struct { func (m *Monitor) MonitorWorkerStatus(ctx context.Context) error { var wg sync.WaitGroup - errChan := make(chan error, len(m.config.Component.Decentralized)+lo.Ternary(m.config.Component.RSS != nil, 1, 0)+len(m.config.Component.Federated)) + errChan := make(chan error, len(m.config.Component.Decentralized)+len(m.config.Component.Federated)) processWorker := func(w *config.Module, processFunc func(context.Context, *config.Module) error) { wg.Add(1) @@ -54,10 +53,6 @@ func (m *Monitor) MonitorWorkerStatus(ctx context.Context) error { processWorker(w, m.processDecentralizedWorker) } - if m.config.Component.RSS != nil { - processWorker(m.config.Component.RSS, m.processRSSWorker) - } - if m.config.Component.Federated != nil { for _, federatedComponent := range m.config.Component.Federated { processWorker(federatedComponent, m.processFederatedWorker) @@ -159,21 +154,6 @@ func (m *Monitor) processFederatedWorker(ctx context.Context, w *config.Module) return m.UpdateWorkerStatusByID(ctx, w.ID, targetStatus.String()) } -// processRSSWorker processes the rss worker status. -func (m *Monitor) processRSSWorker(ctx context.Context, w *config.Module) error { - client, ok := m.clients[w.Network] - if !ok { - return fmt.Errorf("client not exist") - } - - targetStatus := workerx.StatusReady - if _, _, err := client.LatestState(ctx); err != nil { - targetStatus = workerx.StatusUnhealthy - } - - return m.UpdateWorkerStatusByID(ctx, w.ID, targetStatus.String()) -} - // getWorkerIndexingStateByClients gets the latest block height (arweave), block number (ethereum), event id (farcaster). func (m *Monitor) getWorkerIndexingStateByClients(ctx context.Context, n network.Network, w string, state CheckpointState, param *config.Parameters) (uint64, uint64, uint64, error) { client, ok := m.clients[n] diff --git a/internal/node/monitor/monitor_mock.go b/internal/node/monitor/monitor_mock.go index 94099974..4af35884 100644 --- a/internal/node/monitor/monitor_mock.go +++ b/internal/node/monitor/monitor_mock.go @@ -7,16 +7,14 @@ import ( "github.com/rss3-network/node/config" "github.com/rss3-network/node/config/parameter" - workerx "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/decentralized" - "github.com/samber/lo" "go.uber.org/zap" ) func (m *Monitor) MonitorMockWorkerStatus(ctx context.Context, currentState CheckpointState, targetWorkerState, latestState uint64) error { var wg sync.WaitGroup - errChan := make(chan error, len(m.config.Component.Decentralized)+lo.Ternary(m.config.Component.RSS != nil, 1, 0)) + errChan := make(chan error, len(m.config.Component.Decentralized)+len(m.config.Component.Federated)) for _, w := range m.config.Component.Decentralized { wg.Add(1) @@ -30,18 +28,6 @@ func (m *Monitor) MonitorMockWorkerStatus(ctx context.Context, currentState Chec }(w) } - if m.config.Component.RSS != nil { - wg.Add(1) - - go func(w *config.Module) { - defer wg.Done() - - if err := m.processMockRSSWorker(ctx, w); err != nil { - errChan <- err - } - }(m.config.Component.RSS) - } - go func() { wg.Wait() close(errChan) @@ -83,18 +69,3 @@ func (m *Monitor) processMockWorker(ctx context.Context, w *config.Module, curre return nil } - -// processMockRSSWorker processes the rss worker status. -func (m *Monitor) processMockRSSWorker(ctx context.Context, w *config.Module) error { - client, ok := m.clients[w.Network] - if !ok { - return fmt.Errorf("client not exist") - } - - targetStatus := workerx.StatusReady - if _, _, err := client.LatestState(ctx); err != nil { - targetStatus = workerx.StatusUnhealthy - } - - return m.UpdateWorkerStatusByID(ctx, w.ID, targetStatus.String()) -} diff --git a/internal/node/monitor/monitor_test.go b/internal/node/monitor/monitor_test.go index be7a2855..f720a261 100644 --- a/internal/node/monitor/monitor_test.go +++ b/internal/node/monitor/monitor_test.go @@ -13,7 +13,6 @@ import ( redisx "github.com/rss3-network/node/provider/redis" "github.com/rss3-network/node/schema/worker" "github.com/rss3-network/node/schema/worker/decentralized" - "github.com/rss3-network/node/schema/worker/rss" "github.com/rss3-network/protocol-go/schema/network" "github.com/stretchr/testify/require" ) @@ -931,50 +930,6 @@ func TestMonitor(t *testing.T) { want: worker.StatusIndexing, wantError: require.NoError, }, - - // RSS rsshub - { - name: "RSSHub Worker Ready Status -> Unhealthy Status", - source: network.RSSProtocol, - arguments: arguments{ - config: &config.File{ - Component: &config.Component{ - RSS: &config.Module{ - ID: "rss-rsshub", - Network: network.RSSHub, - Worker: rss.Core, - EndpointID: "https://rsshub3.bruce.com", - }, - }, - }, - currentState: monitor.CheckpointState{}, - latestState: 0, - initialStatus: worker.StatusReady, - }, - want: worker.StatusUnhealthy, - wantError: require.NoError, - }, - { - name: "Rsshub Worker Ready Status -> Ready Status", - source: network.RSSProtocol, - arguments: arguments{ - config: &config.File{ - Component: &config.Component{ - RSS: &config.Module{ - ID: "rss-rsshub", - Network: network.RSSHub, - Worker: rss.Core, - EndpointID: "https://rsshub.app", - }, - }, - }, - currentState: monitor.CheckpointState{}, - latestState: 0, - initialStatus: worker.StatusReady, - }, - want: worker.StatusReady, - wantError: require.NoError, - }, } // Start Redis container @@ -1001,7 +956,7 @@ func TestMonitor(t *testing.T) { testcase := testcase switch testcase.source { - case network.FarcasterProtocol, network.ArweaveProtocol, network.EthereumProtocol, network.NearProtocol: + case network.FarcasterProtocol, network.ArweaveProtocol, network.EthereumProtocol, network.NearProtocol, network.ActivityPubProtocol: t.Run(testcase.name, func(t *testing.T) { ctx := context.Background() @@ -1024,25 +979,6 @@ func TestMonitor(t *testing.T) { status := instance.GetWorkerStatusByID(ctx, testcase.arguments.config.Component.Decentralized[0].ID) require.Equal(t, testcase.want, status) }) - case network.RSSProtocol: - t.Run(testcase.name, func(t *testing.T) { - ctx := context.Background() - - instance, err := monitor.NewMonitor(ctx, testcase.arguments.config, nil, redisClient, nil, nil) - require.NoError(t, err) - - // update worker status to initial status - err = instance.UpdateWorkerStatusByID(ctx, testcase.arguments.config.Component.RSS.ID, testcase.arguments.initialStatus.String()) - require.NoError(t, err) - - // run monitor - err = instance.MonitorMockWorkerStatus(ctx, testcase.arguments.currentState, testcase.arguments.targetState, testcase.arguments.latestState) - require.NoError(t, err) - - // check final worker status - status := instance.GetWorkerStatusByID(ctx, testcase.arguments.config.Component.RSS.ID) - require.Equal(t, testcase.want, status) - }) default: } } diff --git a/internal/node/monitor/server.go b/internal/node/monitor/server.go index 95afbaf7..a5d52026 100644 --- a/internal/node/monitor/server.go +++ b/internal/node/monitor/server.go @@ -97,8 +97,6 @@ func initNetworkClient(m *config.Module) (Client, error) { client, err = NewArweaveClient() case network.FarcasterProtocol: client, err = NewFarcasterClient() - case network.RSSProtocol: - client, err = NewRssClient(m.EndpointID, m.Parameters) case network.EthereumProtocol: client, err = NewEthereumClient(m.Endpoint) case network.NearProtocol: