Skip to content

Files

Latest commit

410e641 · Jul 17, 2024

History

History

worker

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Worker Module

ci go report codecov Deps PkgGoDev

Worker module based on sync.

Installation

go get github.com/ankorstore/yokai/worker

Documentation

This module provides a WorkerPool, that:

The WorkerPool can be configured to:

  • defer all workers start with a threshold in seconds: 0 by default (start immediately)
  • attempt a maximum amount of runs in case of failures: 1 by default (no restarts)

The Worker executions:

  • have a unique identifier
  • have automatic panic recovery
  • are automatically logged
  • are automatically generating metrics

Workers

This module provides a Worker interface to implement to provide your own workers, for example:

package workers

import (
	"context"

	"github.com/ankorstore/yokai/worker"
)

// classic worker
type ClassicWorker struct{}

func NewClassicWorker() *ClassicWorker {
	return &ClassicWorker{}
}

func (w *ClassicWorker) Name() string {
	return "classic-worker"
}

func (w *ClassicWorker) Run(ctx context.Context) error {
	worker.CtxLogger(ctx).Info().Msg("run")

	return nil
}

// cancellable worker
type CancellableWorker struct{}

func NewCancellableWorker() *CancellableWorker {
	return &CancellableWorker{}
}

func (w *CancellableWorker) Name() string {
	return "cancellable-worker"
}

func (w *CancellableWorker) Run(ctx context.Context) error {
	logger := worker.CtxLogger(ctx)

	for {
		select {
		// when the WorkerPool stops, the ctx cancellation is forwarded to the workers
		case <-ctx.Done():
			logger.Info().Msg("cancel")

			return w.cancel()
		default:
			logger.Info().Msg("run")

			return w.run(ctx)
		}
	}
}

func (w *CancellableWorker) run(ctx context.Context) error {
	// your worker logic
}

func (w *CancellableWorker) cancel() error {
	// your worker cancel logic, for example graceful shutdown
}

Notes:

  • to perform more complex tasks, you can inject dependencies to your workers implementation (ex: database, cache, etc.)
  • it is recommended to design your workers with a single responsibility

WorkerPool

You can create a WorkerPool instance with the DefaultWorkerPoolFactory, register your Worker implementations, and start them:

package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
	"path/to/workers"
)

func main() {
	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
		worker.WithGlobalDeferredStartThreshold(1),                                             // will defer all workers start of 1 second
		worker.WithGlobalMaxExecutionsAttempts(2),                                              // will run 2 times max failing workers
		worker.WithWorker(workers.NewClassicWorker(), worker.WithDeferredStartThreshold(3)),    // registers the ClassicWorker, with a deferred start of 3 second
		worker.WithWorker(workers.NewCancellableWorker(), worker.WithMaxExecutionsAttempts(4)), // registers the CancellableWorker, with 4 runs max
	)

	// start the pool
	pool.Start(context.Background())

	// get all workers execution reports, in real time
	executions := pool.Executions()

	// stop the pool (will forward context cancellation to each worker)
	pool.Stop()

	// get a specific worker execution report, after pool stop
	execution, _ := pool.Execution("cancellable-worker")
}

Logging

You can use the CtxLogger() function to retrieve the contextual log.Logger from your workers, and emit correlated logs.

The workers executions are logged, with the following fields added automatically to each log records:

  • worker: worker name
  • workerExecutionID: worker execution id
package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
)

type LoggingWorker struct{}

func NewLoggingWorker() *LoggingWorker {
	return &LoggingWorker{}
}

func (w *LoggingWorker) Name() string {
	return "logging-worker"
}

func (w *LoggingWorker) Run(ctx context.Context) error {
	// log the current worker name and execution id
	worker.CtxLogger(ctx).Info().Msgf(
		"execution %s for worker %s",
		worker.CtxWorkerName(ctx),        // contextual worker name
		worker.CtxWorkerExecutionId(ctx), // contextual worker execution id
	)

	return nil
}

func main() {
	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
		worker.WithWorker(NewLoggingWorker()), // registers the LoggingWorker
	)

	// start the pool
	pool.Start(context.Background())
}

Tracing

You can use the CtxTracer() function to retrieve the contextual tracer from your workers, and emit correlated spans: they will have the Worker and WorkerExecutionID attributes added with respectively the worker name and execution id.

This module provides the AnnotateTracerProvider function, to extend a TracerProvider to add automatically current worker information id to the spans emitted during a worker execution:

package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
	"go.opentelemetry.io/otel/trace"
)

// tracing worker
type TracingWorker struct{}

func NewTracingWorker() *TracingWorker {
	return &TracingWorker{}
}

func (w *TracingWorker) Name() string {
	return "tracing-worker"
}

func (w *TracingWorker) Run(ctx context.Context) error {
	// emit some trace span
	_, span := worker.CtxTracer(ctx).Start(ctx, "some span")
	defer span.End()

	return nil
}

func main() {
	// tracer provider
	tp := trace.GetTracerProvider()

	// annotate the tracer provider
	worker.AnnotateTracerProvider(tp)

	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
		worker.WithWorker(NewTracingWorker()),
	)

	// start the pool
	pool.Start(context.Background())
}

Metrics

The WorkerPool automatically generate metrics about:

  • started workers
  • re started workers
  • workers stopped with success
  • workers stopped with error

To enable those metrics in a registry, simply call Register on the WorkerMetrics of the WorkerPool:

package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
	"github.com/prometheus/client_golang/prometheus"
)

func main() {
	// metrics registry
	registry := prometheus.NewRegistry()

	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create()

	// register the pool metrics
	pool.Metrics().Register(registry)

	// start the pool
	pool.Start(context.Background())
}

Healthcheck

This module provides an WorkerProbe, compatible with the healthcheck module:

package main

import (
	"context"

	yokaihc "github.com/ankorstore/yokai/healthcheck"
	"github.com/ankorstore/yokai/worker"
	"github.com/ankorstore/yokai/worker/healthcheck"
)

func main() {
	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create()

	// create the checker with the worker probe
	checker, _ := yokaihc.NewDefaultCheckerFactory().Create(
		yokaihc.WithProbe(healthcheck.NewWorkerProbe(pool)),
	)

	// start the pool
	pool.Start(context.Background())

	// run the checker
	res, _ := checker.Check(context.Background(), yokaihc.Readiness)
}

This probe is successful if all the executions statuses of the WorkerPool are healthy.