diff --git a/cmd/manager_store.go b/cmd/manager_store.go index 91bd7fbf7..c5787a28b 100644 --- a/cmd/manager_store.go +++ b/cmd/manager_store.go @@ -28,10 +28,12 @@ func newManagerStore(q *models.Queries, c *core.Core, m media.Store) *store { } } -// NextCampaigns retrieves active campaigns ready to be processed. -func (s *store) NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error) { +// NextCampaigns retrieves active campaigns ready to be processed excluding +// campaigns that are also being processed. Additionally, it takes a map of campaignID:sentCount +// of campaigns that are being processed and updates them in the DB. +func (s *store) NextCampaigns(currentIDs []int64, sentCounts []int64) ([]*models.Campaign, error) { var out []*models.Campaign - err := s.queries.NextCampaigns.Select(&out, pq.Int64Array(excludeIDs)) + err := s.queries.NextCampaigns.Select(&out, pq.Int64Array(currentIDs), pq.Int64Array(sentCounts)) return out, err } @@ -58,6 +60,12 @@ func (s *store) UpdateCampaignStatus(campID int, status string) error { return err } +// UpdateCampaignStatus updates a campaign's status. +func (s *store) UpdateCampaignCounts(campID int, toSend int, sent int, lastSubID int) error { + _, err := s.queries.UpdateCampaignCounts.Exec(campID, toSend, sent, lastSubID) + return err +} + // GetAttachment fetches a media attachment blob. func (s *store) GetAttachment(mediaID int) (models.Attachment, error) { m, err := s.core.GetMedia(mediaID, "", s.media) diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 86bba3ea0..bdf3cec3a 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -28,11 +28,12 @@ const ( // Store represents a data backend, such as a database, // that provides subscriber and campaign records. type Store interface { - NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error) + NextCampaigns(currentIDs []int64, sentCounts []int64) ([]*models.Campaign, error) NextSubscribers(campID, limit int) ([]models.Subscriber, error) GetCampaign(campID int) (*models.Campaign, error) GetAttachment(mediaID int) (models.Attachment, error) UpdateCampaignStatus(campID int, status string) error + UpdateCampaignCounts(campID int, toSend int, sent int, lastSubID int) error CreateLink(url string) (string, error) BlocklistSubscriber(id int64) error DeleteSubscriber(id int64) error @@ -165,9 +166,9 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18 pipes: make(map[int]*pipe), tpls: make(map[int]*models.Template), links: make(map[string]string), - nextPipes: make(chan *pipe, cfg.Concurrency), - campMsgQ: make(chan CampaignMessage, cfg.Concurrency*2), - msgQ: make(chan models.Message, cfg.Concurrency), + nextPipes: make(chan *pipe, 1000), + campMsgQ: make(chan CampaignMessage, cfg.Concurrency*cfg.MessageRate*2), + msgQ: make(chan models.Message, cfg.Concurrency*cfg.MessageRate*2), slidingStart: time.Now(), } m.tplFuncs = m.makeGnericFuncMap() @@ -275,7 +276,10 @@ func (m *Manager) Run() { if has { // There are more subscribers to fetch. Queue again. - m.nextPipes <- p + select { + case m.nextPipes <- p: + default: + } } else { // Mark the pseudo counter that's added in makePipe() that is used // to force a wait on a pipe. @@ -388,7 +392,8 @@ func (m *Manager) scanCampaigns(tick time.Duration) { select { // Periodically scan the data source for campaigns to process. case <-t.C: - campaigns, err := m.store.NextCampaigns(m.getRunningCampaignIDs()) + ids, counts := m.getCurrentCampaigns() + campaigns, err := m.store.NextCampaigns(ids, counts) if err != nil { m.log.Printf("error fetching campaigns: %v", err) continue @@ -488,7 +493,12 @@ func (m *Manager) worker() { if err != nil { msg.pipe.OnError() } else { + id := uint64(msg.Subscriber.ID) + if id > msg.pipe.lastID.Load() { + msg.pipe.lastID.Store(uint64(msg.Subscriber.ID)) + } msg.pipe.rate.Incr(1) + msg.pipe.sent.Add(1) } } @@ -518,6 +528,29 @@ func (m *Manager) getRunningCampaignIDs() []int64 { return ids } +// getCurrentCampaigns returns the IDs of campaigns currently being processed +// and their sent counts. +func (m *Manager) getCurrentCampaigns() ([]int64, []int64) { + // Needs to return an empty slice in case there are no campaigns. + m.pipesMut.RLock() + defer m.pipesMut.RUnlock() + + var ( + ids = make([]int64, 0, len(m.pipes)) + counts = make([]int64, 0, len(m.pipes)) + ) + for _, p := range m.pipes { + ids = append(ids, int64(p.camp.ID)) + + // Get the sent counts for campaigns and reset them to 0 + // as in the database, they're stored cumulatively (sent += $newSent). + counts = append(counts, p.sent.Load()) + p.sent.Store(0) + } + + return ids, counts +} + // isCampaignProcessing checks if the campaign is being processed. func (m *Manager) isCampaignProcessing(id int) bool { m.pipesMut.RLock() diff --git a/internal/manager/pipe.go b/internal/manager/pipe.go index c1b36ee35..d97b9f592 100644 --- a/internal/manager/pipe.go +++ b/internal/manager/pipe.go @@ -14,8 +14,10 @@ type pipe struct { camp *models.Campaign rate *ratecounter.RateCounter wg *sync.WaitGroup - stopped atomic.Bool + sent atomic.Int64 + lastID atomic.Uint64 errors atomic.Uint64 + stopped atomic.Bool withErrors atomic.Bool m *Manager @@ -182,6 +184,11 @@ func (p *pipe) cleanup() { p.m.pipesMut.Unlock() }() + // Update campaign's "sent" count. + if err := p.m.store.UpdateCampaignCounts(p.camp.ID, 0, int(p.sent.Load()), int(p.lastID.Load())); err != nil { + p.m.log.Printf("error updating campaign counts (%s): %v", p.camp.Name, err) + } + // The campaign was auto-paused due to errors. if p.withErrors.Load() { if err := p.m.store.UpdateCampaignStatus(p.camp.ID, models.CampaignStatusPaused); err != nil { diff --git a/queries.sql b/queries.sql index 557227c63..a5fd1f28a 100644 --- a/queries.sql +++ b/queries.sql @@ -621,7 +621,7 @@ SELECT id, status, to_send, sent, started_at, updated_at -- that is, the total number of subscribers to be processed across all lists of a campaign. -- Thus, it has a sideaffect. -- In addition, it finds the max_subscriber_id, the upper limit across all lists of --- a campaign. This is used to fetch and slice subscribers for the campaign in next-subscriber-campaigns. +-- a campaign. This is used to fetch and slice subscribers for the campaign in next-campaign-subscribers. WITH camps AS ( -- Get all running campaigns and their template bodies (if the template's deleted, the default template body instead) SELECT campaigns.*, COALESCE(templates.body, (SELECT body FROM templates WHERE is_default = true LIMIT 1)) AS template_body @@ -666,6 +666,12 @@ counts AS ( ) GROUP BY camps.id ), +updateCounts AS ( + WITH uc (campaign_id, sent_count) AS (SELECT * FROM unnest($1::INT[], $2::INT[])) + UPDATE campaigns + SET sent = sent + uc.sent_count + FROM uc WHERE campaigns.id = uc.campaign_id +), u AS ( -- For each campaign, update the to_send count and set the max_subscriber_id. UPDATE campaigns AS ca @@ -767,9 +773,7 @@ subs AS ( ), u AS ( UPDATE campaigns - SET last_subscriber_id = (SELECT MAX(id) FROM subs), - sent = sent + (SELECT COUNT(id) FROM subs), - updated_at = NOW() + SET last_subscriber_id = (SELECT MAX(id) FROM subs), updated_at = NOW() WHERE (SELECT COUNT(id) FROM subs) > 0 AND id=$1 ) SELECT * FROM subs; @@ -829,8 +833,8 @@ INSERT INTO campaign_lists (campaign_id, list_id, list_name) -- name: update-campaign-counts UPDATE campaigns SET to_send=(CASE WHEN $2 != 0 THEN $2 ELSE to_send END), - sent=(CASE WHEN $3 != 0 THEN $3 ELSE sent END), - last_subscriber_id=(CASE WHEN $4 != 0 THEN $4 ELSE last_subscriber_id END), + sent=sent+$3, + last_subscriber_id=(CASE WHEN $4 > 0 THEN $4 ELSE to_send END), updated_at=NOW() WHERE id=$1;