Skip to content

Commit

Permalink
Improve large feed deletion
Browse files Browse the repository at this point in the history
First patch to avoid the app hanging when deleting large feeds
  • Loading branch information
fguillot committed Mar 17, 2021
1 parent 89c1b3b commit 51fb949
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 72 deletions.
29 changes: 20 additions & 9 deletions storage/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"database/sql"
"errors"
"fmt"
"runtime"

"miniflux.app/config"
"miniflux.app/logger"
"miniflux.app/model"
)

Expand Down Expand Up @@ -353,21 +355,30 @@ func (s *Storage) UpdateFeedError(feed *model.Feed) (err error) {
return nil
}

// RemoveFeed removes a feed.
// RemoveFeed removes a feed and all entries.
// This operation can takes time if the feed has lot of entries.
func (s *Storage) RemoveFeed(userID, feedID int64) error {
query := `DELETE FROM feeds WHERE id = $1 AND user_id = $2`
result, err := s.db.Exec(query, feedID, userID)
rows, err := s.db.Query(`SELECT id FROM entries WHERE user_id=$1 AND feed_id=$2`, userID, feedID)
if err != nil {
return fmt.Errorf(`store: unable to remove feed #%d: %v`, feedID, err)
return fmt.Errorf(`store: unable to get user feed entries: %v`, err)
}
defer rows.Close()

count, err := result.RowsAffected()
if err != nil {
return fmt.Errorf(`store: unable to remove feed #%d: %v`, feedID, err)
for rows.Next() {
var entryID int64
if err := rows.Scan(&entryID); err != nil {
return fmt.Errorf(`store: unable to read user feed entry ID: %v`, err)
}

logger.Debug(`[FEED DELETION] Deleting entry #%d of feed #%d for user #%d (%d GoRoutines)`, entryID, feedID, userID, runtime.NumGoroutine())

if _, err := s.db.Exec(`DELETE FROM entries WHERE id=$1 AND user_id=$2`, entryID, userID); err != nil {
return fmt.Errorf(`store: unable to delete user feed entries #%d: %v`, entryID, err)
}
}

if count == 0 {
return errors.New(`store: no feed has been removed`)
if _, err := s.db.Exec(`DELETE FROM feeds WHERE id=$1`, feedID); err != nil {
return fmt.Errorf(`store: unable to delete feed #%d: %v`, feedID, err)
}

return nil
Expand Down
90 changes: 27 additions & 63 deletions storage/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,39 @@ func (s *Storage) RemoveUser(userID int64) error {
// RemoveUserAsync deletes user data without locking the database.
func (s *Storage) RemoveUserAsync(userID int64) {
go func() {
deleteUserFeeds(s.db, userID)
if err := s.deleteUserFeeds(userID); err != nil {
logger.Error(`%v`, err)
return
}

s.db.Exec(`DELETE FROM users WHERE id=$1`, userID)
s.db.Exec(`DELETE FROM integrations WHERE user_id=$1`, userID)

logger.Debug(`[MASS DELETE] User #%d has been deleted (%d GoRoutines)`, userID, runtime.NumGoroutine())
}()
}

func (s *Storage) deleteUserFeeds(userID int64) error {
rows, err := s.db.Query(`SELECT id FROM feeds WHERE user_id=$1`, userID)
if err != nil {
return fmt.Errorf(`store: unable to get user feeds: %v`, err)
}
defer rows.Close()

for rows.Next() {
var feedID int64
rows.Scan(&feedID)

logger.Debug(`[USER DELETION] Deleting feed #%d for user #%d (%d GoRoutines)`, feedID, userID, runtime.NumGoroutine())

if err := s.RemoveFeed(userID, feedID); err != nil {
return err
}
}

return nil
}

// Users returns all users.
func (s *Storage) Users() (model.Users, error) {
query := `
Expand Down Expand Up @@ -538,65 +564,3 @@ func hashPassword(password string) (string, error) {
bytes, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
return string(bytes), err
}

func deleteUserFeeds(db *sql.DB, userID int64) {
query := `SELECT id FROM feeds WHERE user_id=$1`
rows, err := db.Query(query, userID)
if err != nil {
logger.Error(`store: unable to get user feeds: %v`, err)
return
}
defer rows.Close()

var feedIDs []int64
for rows.Next() {
var feedID int64
rows.Scan(&feedID)
feedIDs = append(feedIDs, feedID)
}

worker := func(jobs <-chan int64, results chan<- bool) {
for feedID := range jobs {
logger.Debug(`[MASS DELETE] Deleting feed #%d for user #%d (%d GoRoutines)`, feedID, userID, runtime.NumGoroutine())
deleteUserEntries(db, userID, feedID)
db.Exec(`DELETE FROM feeds WHERE id=$1`, feedID)
results <- true
}
}

numJobs := len(feedIDs)
jobs := make(chan int64, numJobs)
results := make(chan bool, numJobs)

for w := 0; w < 2; w++ {
go worker(jobs, results)
}

for j := 0; j < numJobs; j++ {
jobs <- feedIDs[j]
}
close(jobs)

for a := 1; a <= numJobs; a++ {
<-results
}
}

func deleteUserEntries(db *sql.DB, userID int64, feedID int64) {
rows, err := db.Query(`SELECT id FROM entries WHERE user_id=$1 AND feed_id=$2`, userID, feedID)
if err != nil {
logger.Error(`store: unable to get user feed entries: %v`, err)
return
}
defer rows.Close()

for rows.Next() {
var entryID int64
rows.Scan(&entryID)

logger.Debug(`[MASS DELETE] Deleting entry #%d for user #%d (%d GoRoutines)`, entryID, userID, runtime.NumGoroutine())

db.Exec(`DELETE FROM enclosures WHERE entry_id=$1 AND user_id=$2`, entryID, userID)
db.Exec(`DELETE FROM entries WHERE id=$1 AND user_id=$2`, entryID, userID)
}
}

0 comments on commit 51fb949

Please # to comment.