Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: /workers_status api fetchWorker logics #625

Merged
merged 7 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 65 additions & 62 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,8 @@ var command = cobra.Command{

var settlementCaller *vsl.SettlementCaller

// Apply database migrations for all modules except the broadcaster.
if module != BroadcasterArg && !config.IsRSSComponentOnly(configFile) {
databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database)
if err != nil {
return fmt.Errorf("dial database: %w", err)
}

tx, err := databaseClient.Begin(cmd.Context())
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}

if err := tx.Migrate(cmd.Context()); err != nil {
err := tx.Rollback()
if err != nil {
return fmt.Errorf("rollback database: %w", err)
}
return fmt.Errorf("migrate database: %w", err)
}

// Broadcaster does not need Redis, DB and Network Params Client
if module != BroadcasterArg {
// Init a Redis client.
if configFile.Redis == nil {
zap.L().Error("redis configFile is missing")
Expand All @@ -122,61 +104,82 @@ var command = cobra.Command{
return fmt.Errorf("new redis client: %w", err)
}

vslClient, err := parameter.InitVSLClient()
if err != nil {
return fmt.Errorf("init vsl client: %w", err)
}
// DB and Network Params Client is not needed for RSS component only Node
if !config.IsRSSComponentOnly(configFile) {
databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database)
if err != nil {
return fmt.Errorf("dial database: %w", err)
}

chainID, err := vslClient.ChainID(context.Background())
if err != nil {
return fmt.Errorf("get chain id: %w", err)
}
tx, err := databaseClient.Begin(cmd.Context())
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}

networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient)
if err != nil {
return fmt.Errorf("new network params caller: %w", err)
}
if err := tx.Migrate(cmd.Context()); err != nil {
err := tx.Rollback()
if err != nil {
return fmt.Errorf("rollback database: %w", err)
}
return fmt.Errorf("migrate database: %w", err)
}

settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient)
if err != nil {
return fmt.Errorf("new settlement caller: %w", err)
}
vslClient, err := parameter.InitVSLClient()
if err != nil {
return fmt.Errorf("init vsl client: %w", err)
}

epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller)
if err != nil {
return fmt.Errorf("get current epoch: %w", err)
}
chainID, err := vslClient.ChainID(context.Background())
if err != nil {
return fmt.Errorf("get chain id: %w", err)
}

// save epoch to redis cache
err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch)
if err != nil {
return fmt.Errorf("update current epoch: %w", err)
}
networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient)
if err != nil {
return fmt.Errorf("new network params caller: %w", err)
}

// when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch
if _, err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch)); err != nil {
zap.L().Error("pull network parameters from VSL", zap.Error(err))
settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient)
if err != nil {
return fmt.Errorf("new settlement caller: %w", err)
}

return fmt.Errorf("pull network parameters from VSL: %w", err)
}
epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller)
if err != nil {
return fmt.Errorf("get current epoch: %w", err)
}

for network, blockStart := range parameter.CurrentNetworkStartBlock {
if blockStart == nil {
continue // Skip if the start block is not defined.
// save epoch to redis cache
err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch)
if err != nil {
return fmt.Errorf("update current epoch: %w", err)
}

// Convert big.Int to int64; safe as long as the value fits in int64.
blockStartInt64 := blockStart.Block.Int64()
// when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch
if _, err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch)); err != nil {
zap.L().Error("pull network parameters from VSL", zap.Error(err))

// Update the current block start for the network in Redis.
err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64)
if err != nil {
return fmt.Errorf("update current block start: %w", err)
return fmt.Errorf("pull network parameters from VSL: %w", err)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
for network, blockStart := range parameter.CurrentNetworkStartBlock {
if blockStart == nil {
continue // Skip if the start block is not defined.
}

// Convert big.Int to int64; safe as long as the value fits in int64.
blockStartInt64 := blockStart.Block.Int64()

// Update the current block start for the network in Redis.
err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64)
if err != nil {
return fmt.Errorf("update current block start: %w", err)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type File struct {
Component *Component `mapstructure:"component" validate:"required"`
Database *Database `mapstructure:"database" validate:"required"`
Stream *Stream `mapstructure:"stream"`
Redis *Redis `mapstructure:"redis"`
Redis *Redis `mapstructure:"redis" validate:"required"`
Observability *Telemetry `mapstructure:"observability"`
}

Expand Down
86 changes: 33 additions & 53 deletions internal/node/component/info/handler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ type WorkerResponse struct {
}

type ComponentInfo struct {
Decentralized []*WorkerInfo `json:"decentralized"`
RSS *WorkerInfo `json:"rss"`
Federated []*WorkerInfo `json:"federated"`
Decentralized []*WorkerInfo `json:"decentralized,omitempty"`
RSS *WorkerInfo `json:"rss,omitempty"`
Federated []*WorkerInfo `json:"federated,omitempty"`
}

type WorkerInfo struct {
Expand All @@ -46,51 +46,13 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error {
workerCount := config.CalculateWorkerCount(c.config)
workerInfoChan := make(chan *WorkerInfo, workerCount)

var response *WorkerResponse

switch {
case c.redisClient != nil && len(c.config.Component.Decentralized) > 0:
// Fetch all worker info concurrently.
c.fetchAllWorkerInfo(ctx, workerInfoChan)

// Build the worker response.
response = c.buildWorkerResponse(workerInfoChan)
case c.config.Component.RSS != nil:
m := c.config.Component.RSS

response = &WorkerResponse{
Data: ComponentInfo{
RSS: &WorkerInfo{
WorkerID: m.ID,
Network: m.Network,
Worker: m.Worker,
Tags: rss.ToTagsMap[m.Worker.(rss.Worker)],
Platform: rss.ToPlatformMap[m.Worker.(rss.Worker)].String(),
Status: worker.StatusReady},
},
}
case len(c.config.Component.Federated) > 0:
f := c.config.Component.Federated[0]
switch f.Worker {
case federated.Core:
response = &WorkerResponse{
Data: ComponentInfo{
RSS: &WorkerInfo{
WorkerID: f.ID,
Network: f.Network,
Worker: f.Worker,
Tags: federated.ToTagsMap[federated.Core],
Platform: rss.ToPlatformMap[f.Worker.(rss.Worker)].String(),
Status: worker.StatusReady},
},
}
default:
return nil
}
default:
return nil
if c.redisClient == nil {
return ctx.JSON(http.StatusInternalServerError, "redis client is required for monitor module")
}

c.fetchAllWorkerInfo(ctx, workerInfoChan)
response := c.buildWorkerResponse(workerInfoChan)

return ctx.JSON(http.StatusOK, response)
}

Expand All @@ -108,7 +70,19 @@ func (c *Component) fetchAllWorkerInfo(ctx echo.Context, workerInfoChan chan<- *
}(w)
}

modules := append(append(c.config.Component.Decentralized, c.config.Component.RSS), c.config.Component.Federated...)
modules := make([]*config.Module, 0, config.CalculateWorkerCount(c.config))

if len(c.config.Component.Decentralized) > 0 {
modules = append(modules, c.config.Component.Decentralized...)
}

if len(c.config.Component.Federated) > 0 {
modules = append(modules, c.config.Component.Federated...)
}

if c.config.Component.RSS != nil {
modules = append(modules, c.config.Component.RSS)
}

for _, m := range modules {
if m.Network.Protocol() == network.RSSProtocol {
Expand All @@ -129,17 +103,23 @@ func (c *Component) fetchAllWorkerInfo(ctx echo.Context, workerInfoChan chan<- *
// buildWorkerResponse builds the worker response from the worker info channel
func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *WorkerResponse {
response := &WorkerResponse{
Data: ComponentInfo{
Decentralized: []*WorkerInfo{},
RSS: &WorkerInfo{},
Federated: []*WorkerInfo{},
},
Data: ComponentInfo{},
}

if len(c.config.Component.Decentralized) > 0 {
response.Data.Decentralized = []*WorkerInfo{}
}

if len(c.config.Component.Federated) > 0 {
response.Data.Federated = []*WorkerInfo{}
}

for workerInfo := range workerInfoChan {
switch workerInfo.Network.Protocol() {
case network.RSSProtocol:
response.Data.RSS = workerInfo
if c.config.Component.RSS != nil {
response.Data.RSS = workerInfo
}
case network.EthereumProtocol, network.FarcasterProtocol, network.ArweaveProtocol, network.NearProtocol:
response.Data.Decentralized = append(response.Data.Decentralized, workerInfo)
case network.ActivityPubProtocol:
Expand Down
Loading