Skip to content

Commit

Permalink
feat: require monitor to get rsshub and federated workers
Browse files Browse the repository at this point in the history
  • Loading branch information
pseudoyu committed Nov 1, 2024
1 parent c3c5c07 commit 7706981
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 106 deletions.
127 changes: 65 additions & 62 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,8 @@ var command = cobra.Command{

var settlementCaller *vsl.SettlementCaller

// Apply database migrations for all modules except the broadcaster.
if module != BroadcasterArg && !config.IsRSSComponentOnly(configFile) {
databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database)
if err != nil {
return fmt.Errorf("dial database: %w", err)
}

tx, err := databaseClient.Begin(cmd.Context())
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}

if err := tx.Migrate(cmd.Context()); err != nil {
err := tx.Rollback()
if err != nil {
return fmt.Errorf("rollback database: %w", err)
}
return fmt.Errorf("migrate database: %w", err)
}

// Broadcaster does not need Redis, DB and Network Params Client
if module != BroadcasterArg {
// Init a Redis client.
if configFile.Redis == nil {
zap.L().Error("redis configFile is missing")
Expand All @@ -122,61 +104,82 @@ var command = cobra.Command{
return fmt.Errorf("new redis client: %w", err)
}

vslClient, err := parameter.InitVSLClient()
if err != nil {
return fmt.Errorf("init vsl client: %w", err)
}
// DB and Network Params Client is not needed for RSS component only Node
if !config.IsRSSComponentOnly(configFile) {
databaseClient, err = dialer.Dial(cmd.Context(), configFile.Database)
if err != nil {
return fmt.Errorf("dial database: %w", err)
}

chainID, err := vslClient.ChainID(context.Background())
if err != nil {
return fmt.Errorf("get chain id: %w", err)
}
tx, err := databaseClient.Begin(cmd.Context())
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}

networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient)
if err != nil {
return fmt.Errorf("new network params caller: %w", err)
}
if err := tx.Migrate(cmd.Context()); err != nil {
err := tx.Rollback()
if err != nil {
return fmt.Errorf("rollback database: %w", err)
}
return fmt.Errorf("migrate database: %w", err)
}

settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient)
if err != nil {
return fmt.Errorf("new settlement caller: %w", err)
}
vslClient, err := parameter.InitVSLClient()
if err != nil {
return fmt.Errorf("init vsl client: %w", err)
}

epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller)
if err != nil {
return fmt.Errorf("get current epoch: %w", err)
}
chainID, err := vslClient.ChainID(context.Background())
if err != nil {
return fmt.Errorf("get chain id: %w", err)
}

// save epoch to redis cache
err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch)
if err != nil {
return fmt.Errorf("update current epoch: %w", err)
}
networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams[chainID.Int64()], vslClient)
if err != nil {
return fmt.Errorf("new network params caller: %w", err)
}

// when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch
if _, err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch)); err != nil {
zap.L().Error("pull network parameters from VSL", zap.Error(err))
settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement[chainID.Int64()], vslClient)
if err != nil {
return fmt.Errorf("new settlement caller: %w", err)
}

return fmt.Errorf("pull network parameters from VSL: %w", err)
}
epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller)
if err != nil {
return fmt.Errorf("get current epoch: %w", err)
}

for network, blockStart := range parameter.CurrentNetworkStartBlock {
if blockStart == nil {
continue // Skip if the start block is not defined.
// save epoch to redis cache
err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch)
if err != nil {
return fmt.Errorf("update current epoch: %w", err)
}

// Convert big.Int to int64; safe as long as the value fits in int64.
blockStartInt64 := blockStart.Block.Int64()
// when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch
if _, err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch)); err != nil {
zap.L().Error("pull network parameters from VSL", zap.Error(err))

// Update the current block start for the network in Redis.
err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64)
if err != nil {
return fmt.Errorf("update current block start: %w", err)
return fmt.Errorf("pull network parameters from VSL: %w", err)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
for network, blockStart := range parameter.CurrentNetworkStartBlock {
if blockStart == nil {
continue // Skip if the start block is not defined.
}

// Convert big.Int to int64; safe as long as the value fits in int64.
blockStartInt64 := blockStart.Block.Int64()

// Update the current block start for the network in Redis.
err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64)
if err != nil {
return fmt.Errorf("update current block start: %w", err)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type File struct {
Component *Component `mapstructure:"component" validate:"required"`
Database *Database `mapstructure:"database" validate:"required"`
Stream *Stream `mapstructure:"stream"`
Redis *Redis `mapstructure:"redis"`
Redis *Redis `mapstructure:"redis" validate:"required"`
Observability *Telemetry `mapstructure:"observability"`
}

Expand Down
53 changes: 10 additions & 43 deletions internal/node/component/info/handler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,51 +46,13 @@ func (c *Component) GetWorkersStatus(ctx echo.Context) error {
workerCount := config.CalculateWorkerCount(c.config)
workerInfoChan := make(chan *WorkerInfo, workerCount)

// Handle redis + decentralized case first
if c.redisClient != nil && len(c.config.Component.Decentralized) > 0 {
if c.redisClient != nil {
c.fetchAllWorkerInfo(ctx, workerInfoChan)
response := c.buildWorkerResponse(workerInfoChan)

return ctx.JSON(http.StatusOK, response)
}

// Logic for RSS and Federated
response := &WorkerResponse{
Data: ComponentInfo{},
}

// Handle RSS if exists
if c.config.Component.RSS != nil {
m := c.config.Component.RSS
response.Data.RSS = &WorkerInfo{
WorkerID: m.ID,
Network: m.Network,
Worker: m.Worker,
Tags: rss.ToTagsMap[m.Worker.(rss.Worker)],
Platform: rss.ToPlatformMap[m.Worker.(rss.Worker)].String(),
Status: worker.StatusReady,
}
}

// Handle Federated if exists
if len(c.config.Component.Federated) > 0 {
f := c.config.Component.Federated[0]
if f.Worker == federated.Core {
response.Data.Federated = []*WorkerInfo{{
WorkerID: f.ID,
Network: f.Network,
Worker: f.Worker,
Tags: federated.ToTagsMap[federated.Core],
Platform: federated.ToPlatformMap[federated.Core].String(),
Status: worker.StatusReady,
}}
}
}

if response.Data.RSS != nil || len(response.Data.Federated) > 0 {
return ctx.JSON(http.StatusOK, response)
}

return nil
}

Expand Down Expand Up @@ -141,10 +103,15 @@ func (c *Component) fetchAllWorkerInfo(ctx echo.Context, workerInfoChan chan<- *
// buildWorkerResponse builds the worker response from the worker info channel
func (c *Component) buildWorkerResponse(workerInfoChan <-chan *WorkerInfo) *WorkerResponse {
response := &WorkerResponse{
Data: ComponentInfo{
Decentralized: []*WorkerInfo{},
Federated: []*WorkerInfo{},
},
Data: ComponentInfo{},
}

if len(c.config.Component.Decentralized) > 0 {
response.Data.Decentralized = []*WorkerInfo{}
}

if len(c.config.Component.Federated) > 0 {
response.Data.Federated = []*WorkerInfo{}
}

for workerInfo := range workerInfoChan {
Expand Down

0 comments on commit 7706981

Please # to comment.