Skip to content

Commit

Permalink
refactor: Move timeline log sink under timelineclient package (#4774)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
jvmakine and github-actions[bot] authored Mar 4, 2025
1 parent f2f94d8 commit f7d5881
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 73 deletions.
30 changes: 0 additions & 30 deletions backend/runner/deployment_logs.go

This file was deleted.

46 changes: 4 additions & 42 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/block/ftl/common/plugin"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/download"
"github.com/block/ftl/internal/dsn"
"github.com/block/ftl/internal/exec"
Expand Down Expand Up @@ -130,8 +129,8 @@ func Start(ctx context.Context, config Config, storage *artefacts.OCIArtefactSer
controllerClient: controllerClient,
schemaClient: schemaClient,
timelineClient: timelineClient,
timelineLogSink: timeline.NewLogSink(timelineClient, log.Debug),
labels: labels,
deploymentLogQueue: make(chan log.Entry, 10000),
cancelFunc: doneFunc,
devEndpoint: config.DevEndpoint,
devHotReloadEndpoint: config.DevHotReloadEndpoint,
Expand Down Expand Up @@ -178,7 +177,7 @@ func (s *Service) startDeployment(ctx context.Context, key key.Deployment, modul
// It is managed externally by the scaling system
return err
}
go s.streamLogsLoop(ctx)
go s.timelineLogSink.RunLogLoop(ctx)
go func() {
go rpc.RetryStreamingClientStream(ctx, backoff.Backoff{}, s.controllerClient.RegisterRunner, s.registrationLoop)
}()
Expand Down Expand Up @@ -269,10 +268,10 @@ type Service struct {
controllerClient ftlv1connect.ControllerServiceClient
schemaClient ftlv1connect.SchemaServiceClient
timelineClient *timeline.Client
timelineLogSink *timeline.LogSink
// Failed to register with the Controller
registrationFailure atomic.Value[optional.Option[error]]
labels *structpb.Struct
deploymentLogQueue chan log.Entry
cancelFunc context.CancelCauseFunc
devEndpoint optional.Option[string]
devHotReloadEndpoint optional.Option[string]
Expand Down Expand Up @@ -627,51 +626,14 @@ func (s *Service) registrationLoop(ctx context.Context, send func(request *ftlv1
return nil
}

func (s *Service) streamLogsLoop(ctx context.Context) {
for entry := range channels.IterContext(ctx, s.deploymentLogQueue) {
dep, ok := entry.Attributes["deployment"]
var deploymentKey key.Deployment
var err error
if ok {
deploymentKey, err = key.ParseDeploymentKey(dep)
if err != nil {
continue
}
}

var errorString *string
if entry.Error != nil {
errStr := entry.Error.Error()
errorString = &errStr
}
var request optional.Option[key.Request]
if reqStr, ok := entry.Attributes["request"]; ok {
req, err := key.ParseRequestKey(reqStr) //nolint:errcheck // best effort
if err == nil {
request = optional.Some(req)
}
}
s.timelineClient.Publish(ctx, &timeline.Log{
DeploymentKey: deploymentKey,
RequestKey: request,
Level: int32(entry.Level),
Time: entry.Time,
Attributes: entry.Attributes,
Message: entry.Message,
Error: optional.Ptr(errorString),
})
}
}

func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey key.Deployment) *log.Logger {
attrs := map[string]string{"deployment": deploymentKey.String()}
if requestKey, _ := rpc.RequestKeyFromContext(ctx); requestKey.Ok() { //nolint:errcheck // best effort
attrs["request"] = requestKey.MustGet().String()
}
ctx = ftlobservability.AddSpanContextToLogger(ctx)

sink := newDeploymentLogsSink(s.deploymentLogQueue)
return log.FromContext(ctx).AddSink(sink).Attrs(attrs)
return log.FromContext(ctx).AddSink(s.timelineLogSink).Attrs(attrs)
}

func (s *Service) healthCheck(writer http.ResponseWriter, request *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ftl-admin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func main() {
kctx.FatalIfErrorf(err)
asmSecretProvider := providers.NewASM(secretsmanager.NewFromConfig(awsConfig))
dbSecretResolver := routers.NewFileRouter[cf.Secrets](cli.Secrets)
sm, err := manager.New[cf.Secrets](ctx, dbSecretResolver, asmSecretProvider)
sm, err := manager.New(ctx, dbSecretResolver, asmSecretProvider)
kctx.FatalIfErrorf(err)

schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.SchemaEndpoint.String(), log.Error)
Expand Down
85 changes: 85 additions & 0 deletions internal/timelineclient/log_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package timelineclient

import (
"context"
"fmt"

"github.com/alecthomas/types/optional"

"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/key"
"github.com/block/ftl/internal/log"
)

// LogSink is a log sink that sends logs to the timeline client.
//
// It needs to be run in a separate goroutine after creation by calling RunLogLoop.
type LogSink struct {
client *Client
logQueue chan log.Entry
level log.Level
}

var _ log.Sink = (*LogSink)(nil)

func NewLogSink(client *Client, level log.Level) *LogSink {
return &LogSink{
client: client,
logQueue: make(chan log.Entry, 10000),
level: level,
}
}

// Log implements Sink
func (l *LogSink) Log(entry log.Entry) error {
if entry.Level < l.level {
return nil
}

select {
case l.logQueue <- entry:
default:
// Drop log entry if queue is full
return fmt.Errorf("log queue is full")
}
return nil
}

// RunLogLoop runs the log loop.
//
// It will run until the context is cancelled.
func (l *LogSink) RunLogLoop(ctx context.Context) {
for entry := range channels.IterContext(ctx, l.logQueue) {
dep, ok := entry.Attributes["deployment"]
var deploymentKey key.Deployment
var err error
if ok {
deploymentKey, err = key.ParseDeploymentKey(dep)
if err != nil {
continue
}
}

var errorString *string
if entry.Error != nil {
errStr := entry.Error.Error()
errorString = &errStr
}
var request optional.Option[key.Request]
if reqStr, ok := entry.Attributes["request"]; ok {
req, err := key.ParseRequestKey(reqStr) //nolint:errcheck // best effort
if err == nil {
request = optional.Some(req)
}
}
l.client.Publish(ctx, &Log{
DeploymentKey: deploymentKey,
RequestKey: request,
Level: int32(entry.Level), //nolint:gosec
Time: entry.Time,
Attributes: entry.Attributes,
Message: entry.Message,
Error: optional.Ptr(errorString),
})
}
}

0 comments on commit f7d5881

Please # to comment.