From 6988514d709964a36684b80ace803fda281b8bbd Mon Sep 17 00:00:00 2001 From: Juho Makinen Date: Tue, 4 Mar 2025 16:22:38 +1100 Subject: [PATCH 1/5] fix: send provisioner info logs to the timeline --- backend/provisioner/service.go | 10 +++++++++- cmd/ftl-provisioner/main.go | 5 ++++- cmd/ftl/cmd_serve.go | 2 +- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/backend/provisioner/service.go b/backend/provisioner/service.go index 04ba76930b..48bcf7b994 100644 --- a/backend/provisioner/service.go +++ b/backend/provisioner/service.go @@ -24,6 +24,7 @@ import ( "github.com/block/ftl/internal/key" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/schema/schemaeventsource" + timeline "github.com/block/ftl/internal/timelineclient" ) // CommonProvisionerConfig is shared config between the production controller and development server. @@ -35,6 +36,7 @@ type CommonProvisionerConfig struct { type Config struct { ControllerEndpoint *url.URL `name:"ftl-controller-endpoint" help:"Controller endpoint." env:"FTL_CONTROLLER_ENDPOINT" default:"http://127.0.0.1:8893"` SchemaEndpoint *url.URL `help:"Schema service endpoint." env:"FTL_SCHEMA_ENDPOINT" default:"http://127.0.0.1:8897"` + TimelineEndpoint *url.URL `help:"Timeline endpoint." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8894"` CommonProvisionerConfig } @@ -75,9 +77,15 @@ func Start( ctx context.Context, registry *ProvisionerRegistry, schemaClient schemaconnect.SchemaServiceClient, + timelineClient *timeline.Client, ) error { - logger := log.FromContext(ctx) + timelineLogSink := timeline.NewLogSink(timelineClient, log.Trace) + go timelineLogSink.RunLogLoop(ctx) + + logger := log.FromContext(ctx).AddSink(timelineLogSink) + ctx = log.ContextWithLogger(ctx, logger) + logger.Debugf("Starting FTL provisioner") svc, err := New(ctx, registry, schemaClient) diff --git a/cmd/ftl-provisioner/main.go b/cmd/ftl-provisioner/main.go index e97214045b..a314c5f449 100644 --- a/cmd/ftl-provisioner/main.go +++ b/cmd/ftl-provisioner/main.go @@ -17,6 +17,7 @@ import ( "github.com/block/ftl/internal/observability" _ "github.com/block/ftl/internal/prodinit" "github.com/block/ftl/internal/rpc" + timeline "github.com/block/ftl/internal/timelineclient" ) var cli struct { @@ -36,6 +37,8 @@ func main() { ) cli.ProvisionerConfig.SetDefaults() + timelineClient := timeline.NewClient(context.Background(), cli.ProvisionerConfig.TimelineEndpoint) + logger := log.Configure(os.Stderr, cli.LogConfig) ctx := log.ContextWithLogger(context.Background(), logger) err := observability.Init(ctx, false, "", "ftl-provisioner", ftl.Version, cli.ObservabilityConfig) @@ -69,6 +72,6 @@ func main() { logger.Debugf("Registered provisioner %s as fallback for runner", runnerBinding) } - err = provisioner.Start(ctx, registry, schemaClient) + err = provisioner.Start(ctx, registry, schemaClient, timelineClient) kctx.FatalIfErrorf(err, "failed to start provisioner") } diff --git a/cmd/ftl/cmd_serve.go b/cmd/ftl/cmd_serve.go index 69df34dba7..563c816f78 100644 --- a/cmd/ftl/cmd_serve.go +++ b/cmd/ftl/cmd_serve.go @@ -324,7 +324,7 @@ func (s *serveCommonConfig) run( } wg.Go(func() error { - if err := provisioner.Start(provisionerCtx, provisionerRegistry, schemaClient); err != nil { + if err := provisioner.Start(provisionerCtx, provisionerRegistry, schemaClient, timelineClient); err != nil { logger.Errorf(err, "provisionerfailed: %v", err) return fmt.Errorf("provisionerfailed: %w", err) } From 2c1c34d34651d6b6ca7ffd706c3ae5a206a635bf Mon Sep 17 00:00:00 2001 From: Juho Makinen Date: Tue, 4 Mar 2025 16:33:13 +1100 Subject: [PATCH 2/5] chore: add deployment key to provisioning runs in provisioner --- backend/provisioner/service.go | 2 ++ cmd/ftl-provisioner-cloudformation/provisioner.go | 2 +- cmd/ftl-provisioner-cloudformation/status.go | 6 +++++- cmd/ftl-provisioner-sandbox/provisioner.go | 2 +- internal/log/logger.go | 6 ++++++ internal/provisioner/executor_test.go | 2 +- internal/provisioner/runner.go | 12 +++++++----- 7 files changed, 23 insertions(+), 9 deletions(-) diff --git a/backend/provisioner/service.go b/backend/provisioner/service.go index 48bcf7b994..d065f77488 100644 --- a/backend/provisioner/service.go +++ b/backend/provisioner/service.go @@ -193,6 +193,7 @@ func RegistryFromConfigFile(ctx context.Context, workingDir string, file *os.Fil return registry, nil } + func (s *Service) HandleChangesetPrepared(ctx context.Context, req key.Changeset) error { _, err := s.schemaClient.CommitChangeset(ctx, connect.NewRequest(&ftlv1.CommitChangesetRequest{Changeset: req.String()})) @@ -201,6 +202,7 @@ func (s *Service) HandleChangesetPrepared(ctx context.Context, req key.Changeset } return nil } + func (s *Service) HandleChangesetCommitted(ctx context.Context, req *schema.Changeset) error { go func() { time.Sleep(time.Second * 5) diff --git a/cmd/ftl-provisioner-cloudformation/provisioner.go b/cmd/ftl-provisioner-cloudformation/provisioner.go index 14a1dc147f..3a76de76b8 100644 --- a/cmd/ftl-provisioner-cloudformation/provisioner.go +++ b/cmd/ftl-provisioner-cloudformation/provisioner.go @@ -117,7 +117,7 @@ func (c *CloudformationProvisioner) Provision(ctx context.Context, req *connect. return nil, fmt.Errorf("provisioner already running: %s", stackID) } logger.Debugf("Starting task for module %s: %s", req.Msg.DesiredModule.Name, stackID) - task.Start(ctx, module.Name) + task.Start(ctx, module.Name, module.Runtime.Deployment.DeploymentKey) return connect.NewResponse(&provisionerpb.ProvisionResponse{ Status: provisionerpb.ProvisionResponse_PROVISION_RESPONSE_STATUS_SUBMITTED, ProvisioningToken: stackID, diff --git a/cmd/ftl-provisioner-cloudformation/status.go b/cmd/ftl-provisioner-cloudformation/status.go index 3eafe92a27..bb4ddfe3e5 100644 --- a/cmd/ftl-provisioner-cloudformation/status.go +++ b/cmd/ftl-provisioner-cloudformation/status.go @@ -23,7 +23,11 @@ func (c *CloudformationProvisioner) Status(ctx context.Context, req *connect.Req // in that case, we start a new task to query the existing stack task, loaded := c.running.LoadOrStore(token, &provisioner.Task{}) if !loaded { - task.Start(ctx, req.Msg.DesiredModule.Name) + dk, err := key.ParseDeploymentKey(token) + if err != nil { + return nil, fmt.Errorf("failed to parse deployment key: %w", err) + } + task.Start(ctx, req.Msg.DesiredModule.Name, dk) } if task.Err() != nil { diff --git a/cmd/ftl-provisioner-sandbox/provisioner.go b/cmd/ftl-provisioner-sandbox/provisioner.go index 567ec7bc69..8880d44452 100644 --- a/cmd/ftl-provisioner-sandbox/provisioner.go +++ b/cmd/ftl-provisioner-sandbox/provisioner.go @@ -88,7 +88,7 @@ func (c *SandboxProvisioner) Provision(ctx context.Context, req *connect.Request return nil, fmt.Errorf("provisioner already running: %s", token) } logger.Debugf("Starting task %s", token) - task.Start(ctx, module.Name) + task.Start(ctx, module.Name, module.Runtime.Deployment.DeploymentKey) return connect.NewResponse(&provisionerpb.ProvisionResponse{ Status: provisionerpb.ProvisionResponse_PROVISION_RESPONSE_STATUS_SUBMITTED, ProvisioningToken: token, diff --git a/internal/log/logger.go b/internal/log/logger.go index c8725ab115..6807954921 100644 --- a/internal/log/logger.go +++ b/internal/log/logger.go @@ -11,12 +11,14 @@ import ( "github.com/alecthomas/atomic" "github.com/alecthomas/types/optional" "github.com/benbjohnson/clock" + "github.com/block/ftl/internal/key" ) var _ Interface = (*Logger)(nil) const scopeKey = "scope" const moduleKey = "module" +const deploymentKey = "deployment" type Entry struct { Time time.Time `json:"-"` @@ -54,6 +56,10 @@ func (l Logger) Module(module string) *Logger { return l.Attrs(map[string]string{moduleKey: module}) } +func (l Logger) Deployment(deployment key.Deployment) *Logger { + return l.Attrs(map[string]string{deploymentKey: deployment.String()}) +} + // Attrs creates a new logger with the given attributes. func (l Logger) Attrs(attributes map[string]string) *Logger { attr := make(map[string]string, len(l.attributes)+len(attributes)) diff --git a/internal/provisioner/executor_test.go b/internal/provisioner/executor_test.go index af6f2382dd..cf7e299e29 100644 --- a/internal/provisioner/executor_test.go +++ b/internal/provisioner/executor_test.go @@ -53,7 +53,7 @@ func TestExecutorSelection(t *testing.T) { }, } - states, err := runner.Run(ctx, "test") + states, err := runner.Run(ctx) assert.NoError(t, err) assert.Equal(t, 2, len(states)) diff --git a/internal/provisioner/runner.go b/internal/provisioner/runner.go index 6c193e7de2..27ac44e622 100644 --- a/internal/provisioner/runner.go +++ b/internal/provisioner/runner.go @@ -8,6 +8,7 @@ import ( "github.com/alecthomas/atomic" "golang.org/x/sync/errgroup" + "github.com/block/ftl/internal/key" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/provisioner/state" ) @@ -41,8 +42,8 @@ type Runner struct { Stages []RunnerStage } -func (r *Runner) Run(ctx context.Context, module string) ([]state.State, error) { - logger := log.FromContext(ctx).Module(module) +func (r *Runner) Run(ctx context.Context) ([]state.State, error) { + logger := log.FromContext(ctx) for _, stage := range r.Stages { logger.Debugf("running stage %s", stage.Name) @@ -120,11 +121,12 @@ func (r *Runner) execute(ctx context.Context, stage *RunnerStage) ([]state.State return result, nil } -func (t *Task) Start(oldCtx context.Context, module string) { +func (t *Task) Start(oldCtx context.Context, module string, deployment key.Deployment) { ctx := context.WithoutCancel(oldCtx) - logger := log.FromContext(ctx).Module(module) + logger := log.FromContext(ctx).Module(module).Deployment(deployment) + ctx = log.ContextWithLogger(ctx, logger) go func() { - outputs, err := t.runner.Run(ctx, module) + outputs, err := t.runner.Run(ctx) if err != nil { logger.Errorf(err, "failed to execute provisioner") t.err.Store(err) From 80249c0ef592823cc9639f38c396761dead19ab1 Mon Sep 17 00:00:00 2001 From: Juho Makinen Date: Tue, 4 Mar 2025 16:44:53 +1100 Subject: [PATCH 3/5] chore: add timeline endpoint to the charts --- charts/ftl/templates/provisioner-deployment.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/charts/ftl/templates/provisioner-deployment.yaml b/charts/ftl/templates/provisioner-deployment.yaml index 35345139d8..9726ae01ab 100644 --- a/charts/ftl/templates/provisioner-deployment.yaml +++ b/charts/ftl/templates/provisioner-deployment.yaml @@ -44,6 +44,8 @@ spec: value: "/working" - name: FTL_CONTROLLER_ENDPOINT value: "http://{{ include "ftl.fullname" . }}-controller:{{ .Values.controller.port }}" + - name: FTL_TIMELINE_ENDPOINT + value: "http://{{ include "ftl.fullname" . }}-timeline:{{ .Values.timeline.port }}" - name: FTL_SCHEMA_ENDPOINT value: "http://{{ include "ftl.fullname" . }}-schema:{{ $.Values.schema.services.schema.port }}" - name: LOG_LEVEL From 49cfa9faa3a463f8970268555886a8af7c9642ec Mon Sep 17 00:00:00 2001 From: Juho Makinen Date: Tue, 4 Mar 2025 16:49:47 +1100 Subject: [PATCH 4/5] chore: log db provisioning in dev to timeline --- backend/provisioner/dev_provisioner.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/provisioner/dev_provisioner.go b/backend/provisioner/dev_provisioner.go index f6417ca161..8ec98e07c0 100644 --- a/backend/provisioner/dev_provisioner.go +++ b/backend/provisioner/dev_provisioner.go @@ -33,11 +33,11 @@ func NewDevProvisioner(postgresPort int, mysqlPort int, recreate bool) *InMemPro } func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn { return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, res schema.Provisioned) (*schema.RuntimeElement, error) { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).Deployment(deployment) dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(res.ResourceID()) - logger.Debugf("Provisioning mysql database: %s", dbName) + logger.Infof("Provisioning mysql database: %s", dbName) //nolint // We assume that the DB hsas already been started when running in dev mode mysqlDSN, err := dev.SetupMySQL(ctx, mysqlPort) @@ -125,10 +125,10 @@ func ProvisionMySQLForTest(ctx context.Context, moduleName string, id string) (s func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisionerFn { return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, resource schema.Provisioned) (*schema.RuntimeElement, error) { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).Deployment(deployment) dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(resource.ResourceID()) - logger.Debugf("Provisioning postgres database: %s", dbName) + logger.Infof("Provisioning postgres database: %s", dbName) //nolint // We assume that the DB has already been started when running in dev mode postgresDSN := dsn.PostgresDSN("ftl", dsn.Port(postgresPort)) From 4886a7a23622c9542f6c672ea7c7108ba1422013 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 4 Mar 2025 06:00:44 +0000 Subject: [PATCH 5/5] chore(autofmt): Automated formatting --- internal/log/logger.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/log/logger.go b/internal/log/logger.go index 6807954921..11477bf13b 100644 --- a/internal/log/logger.go +++ b/internal/log/logger.go @@ -11,6 +11,7 @@ import ( "github.com/alecthomas/atomic" "github.com/alecthomas/types/optional" "github.com/benbjohnson/clock" + "github.com/block/ftl/internal/key" )