Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

email/exporter: Add email.Exporter gRPC service #8017

Open
wants to merge 1 commit into
base: add-email-pardot
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions email/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package email

import (
"context"
"errors"
"sync"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/letsencrypt/boulder/core"
emailpb "github.com/letsencrypt/boulder/email/proto"
berrors "github.com/letsencrypt/boulder/errors"
blog "github.com/letsencrypt/boulder/log"
)

const (
// five is the number of concurrent workers processing the email queue. This
// number was chosen specifically to match the number of concurrent
// connections allowed by the Pardot API.
five = 5

// queueCap enforces a maximum stack size to prevent unbounded growth.
queueCap = 10000
)

var ErrQueueFull = errors.New("email-exporter queue is full")

// ExporterImpl implements the gRPC server and processes email exports.
type ExporterImpl struct {
emailpb.UnsafeExporterServer

sync.Mutex
drainWG sync.WaitGroup
wake *sync.Cond

limiter *rate.Limiter
toSend []string
client PardotClient
emailsHandledCounter prometheus.Counter
log blog.Logger
}

var _ emailpb.ExporterServer = (*ExporterImpl)(nil)

// NewExporterImpl creates a new ExporterImpl.
func NewExporterImpl(client PardotClient, perDayLimit float64, scope prometheus.Registerer, logger blog.Logger) *ExporterImpl {
// This limiter enforces the daily Pardot API limit and restricts
// concurrency to the maximum of 5 requests specified in their
// documentation. For more details see:
// https://developer.salesforce.com/docs/marketing/pardot/guide/overview.html?q=rate%20limits
limiter := rate.NewLimiter(rate.Limit(perDayLimit/86400.0), five)

emailsHandledCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: "email_exporter_emails_handled",
Help: "Total number of emails handled by the email exporter",
})
scope.MustRegister(emailsHandledCounter)

impl := &ExporterImpl{
limiter: limiter,
toSend: make([]string, 0, queueCap),
client: client,
emailsHandledCounter: emailsHandledCounter,
log: logger,
}
impl.wake = sync.NewCond(&impl.Mutex)

queueGauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "email_exporter_queue_length",
Help: "Current length of the email export queue",
}, func() float64 {
impl.Lock()
defer impl.Unlock()
return float64(len(impl.toSend))
})
scope.MustRegister(queueGauge)

return impl
}

// SendContacts enqueues the provided email addresses. If the queue cannot
// accommodate the new emails, an ErrQueueFull is returned.
func (impl *ExporterImpl) SendContacts(ctx context.Context, req *emailpb.SendContactsRequest) (*emptypb.Empty, error) {
if core.IsAnyNilOrZero(req, req.Emails) {
return nil, berrors.InternalServerError("Incomplete UpsertEmails request")
}

impl.Lock()
spotsLeft := queueCap - len(impl.toSend)
if spotsLeft < len(req.Emails) {
return nil, ErrQueueFull
}
impl.toSend = append(impl.toSend, req.Emails...)
impl.Unlock()
// Wake waiting workers to process the new emails.
impl.wake.Broadcast()

return &emptypb.Empty{}, nil
}

// Start begins asynchronous processing of the email queue. When the parent
// daemonCtx is cancelled the queue will be drained and the workers will exit.
func (impl *ExporterImpl) Start(daemonCtx context.Context) {
go func() {
<-daemonCtx.Done()
impl.Lock()
// Wake waiting workers to exit.
impl.wake.Broadcast()
impl.Unlock()
}()

worker := func() {
defer impl.drainWG.Done()
for {
impl.Lock()

for len(impl.toSend) == 0 && daemonCtx.Err() == nil {
// Wait for the queue to be updated or the daemon to exit.
impl.wake.Wait()
}

if len(impl.toSend) == 0 && daemonCtx.Err() != nil {
// No more emails to process, exit.
impl.Unlock()
return
}

// Dequeue and dispatch an email.
last := len(impl.toSend) - 1
email := impl.toSend[last]
impl.toSend = impl.toSend[:last]
impl.Unlock()

err := impl.limiter.Wait(daemonCtx)
if err != nil {
if !errors.Is(err, context.Canceled) {
impl.log.Errf("Unexpected limiter.Wait() error: %s", err)
continue
}
}

err = impl.client.SendContact(email)
if err != nil {
impl.log.Errf("Sending Contact to Pardot: %s", err)
}
impl.emailsHandledCounter.Inc()
}
}

for range five {
impl.drainWG.Add(1)
go worker()
}
}

// Drain blocks until all workers have finished processing the email queue.
func (impl *ExporterImpl) Drain() {
impl.drainWG.Wait()
}
123 changes: 123 additions & 0 deletions email/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package email

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"

emailpb "github.com/letsencrypt/boulder/email/proto"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/test"
)

var ctx = context.Background()

// MockPardotClientImpl is a mock implementation of PardotClient.
type MockPardotClientImpl struct {
sync.Mutex
CreatedContacts []string
}

// NewMockPardotClientImpl returns a MockPardotClientImpl, implementing the
// PardotClient interface. Both refer to the same instance, with the interface
// for mock interaction and the struct for state inspection and modification.
func NewMockPardotClientImpl() (PardotClient, *MockPardotClientImpl) {
mockImpl := &MockPardotClientImpl{
CreatedContacts: []string{},
}
return mockImpl, mockImpl
}

// SendContact adds an email to CreatedContacts.
func (m *MockPardotClientImpl) SendContact(email string) error {
m.Lock()
defer m.Unlock()

m.CreatedContacts = append(m.CreatedContacts, email)
return nil
}

func (m *MockPardotClientImpl) getCreatedContacts() []string {
m.Lock()
defer m.Unlock()
// Return a copy to avoid race conditions.
return append([]string(nil), m.CreatedContacts...)
}

func setup() (*ExporterImpl, *MockPardotClientImpl, func(), func()) {
mockClient, clientImpl := NewMockPardotClientImpl()
logger := blog.NewMock()
scope := prometheus.NewRegistry()
exporter := NewExporterImpl(mockClient, 1000000, scope, logger)

daemonCtx, cancel := context.WithCancel(context.Background())

return exporter, clientImpl,
func() { exporter.Start(daemonCtx) },
func() {
cancel()
exporter.Drain()
}
}

func TestSendContacts(t *testing.T) {
t.Parallel()

exporter, clientImpl, start, cleanup := setup()
start()
defer cleanup()

_, err := exporter.SendContacts(ctx, &emailpb.SendContactsRequest{
Emails: []string{"test@example.com", "user@example.com"},
})

// Wait for the queue to be processed.
time.Sleep(100 * time.Millisecond)

test.AssertNotError(t, err, "Error creating contacts")
test.AssertEquals(t, 2, len(clientImpl.getCreatedContacts()))
}

func TestSendContactsQueueFull(t *testing.T) {
t.Parallel()

exporter, _, _, _ := setup()

// Fill the queue.
exporter.Lock()
exporter.toSend = make([]string, queueCap-1)
exporter.Unlock()

_, err := exporter.SendContacts(ctx, &emailpb.SendContactsRequest{
Emails: []string{"test@example.com", "user@example.com"},
})
test.AssertErrorIs(t, err, ErrQueueFull)
}

func TestSendContactsQueueDrains(t *testing.T) {
t.Parallel()

exporter, clientImpl, start, cleanup := setup()
start()

// Add 100 emails to the queue.
var emails []string
for i := range 100 {
emails = append(emails, fmt.Sprintf("test@%d.example.com", i))
}

_, err := exporter.SendContacts(ctx, &emailpb.SendContactsRequest{
Emails: emails,
})
test.AssertNotError(t, err, "Error creating contacts")

// Drain the queue.
cleanup()

// Check that the queue was drained.
test.AssertEquals(t, 100, len(clientImpl.getCreatedContacts()))
}
Loading
Loading