-
Notifications
You must be signed in to change notification settings - Fork 235
/
Copy pathreset.go
258 lines (242 loc) · 8.86 KB
/
reset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
package reset
import (
"context"
_ "embed"
"fmt"
"io"
"os"
"strconv"
"strings"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/errdefs"
"github.com/go-errors/errors"
"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v4"
"github.com/spf13/afero"
"github.com/supabase/cli/internal/db/start"
"github.com/supabase/cli/internal/gen/keys"
"github.com/supabase/cli/internal/migration/apply"
"github.com/supabase/cli/internal/migration/repair"
"github.com/supabase/cli/internal/seed/buckets"
"github.com/supabase/cli/internal/utils"
"github.com/supabase/cli/pkg/migration"
"github.com/supabase/cli/pkg/vault"
)
func Run(ctx context.Context, version string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
if len(version) > 0 {
if _, err := strconv.Atoi(version); err != nil {
return errors.New(repair.ErrInvalidVersion)
}
if _, err := repair.GetMigrationFile(version, fsys); err != nil {
return err
}
}
if !utils.IsLocalDatabase(config) {
msg := "Do you want to reset the remote database?"
if shouldReset, err := utils.NewConsole().PromptYesNo(ctx, msg, false); err != nil {
return err
} else if !shouldReset {
return errors.New(context.Canceled)
}
return resetRemote(ctx, version, config, fsys, options...)
}
// Config file is loaded before parsing --linked or --local flags
if err := utils.AssertSupabaseDbIsRunning(); err != nil {
return err
}
// Reset postgres database because extensions (pg_cron, pg_net) require postgres
if err := resetDatabase(ctx, version, fsys, options...); err != nil {
return err
}
// Seed objects from supabase/buckets directory
if resp, err := utils.Docker.ContainerInspect(ctx, utils.StorageId); err == nil {
if resp.State.Health == nil || resp.State.Health.Status != types.Healthy {
if err := start.WaitForHealthyService(ctx, 30*time.Second, utils.StorageId); err != nil {
return err
}
}
if err := buckets.Run(ctx, "", false, fsys); err != nil {
return err
}
}
branch := keys.GetGitBranch(fsys)
fmt.Fprintln(os.Stderr, "Finished "+utils.Aqua("supabase db reset")+" on branch "+utils.Aqua(branch)+".")
return nil
}
func resetDatabase(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
fmt.Fprintln(os.Stderr, "Resetting local database"+toLogMessage(version))
if utils.Config.Db.MajorVersion <= 14 {
return resetDatabase14(ctx, version, fsys, options...)
}
return resetDatabase15(ctx, version, fsys, options...)
}
func toLogMessage(version string) string {
if len(version) > 0 {
return " to version: " + version
}
return "..."
}
func resetDatabase14(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
if err := recreateDatabase(ctx, options...); err != nil {
return err
}
if err := initDatabase(ctx, options...); err != nil {
return err
}
if err := RestartDatabase(ctx, os.Stderr); err != nil {
return err
}
conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{}, options...)
if err != nil {
return err
}
defer conn.Close(context.Background())
return apply.MigrateAndSeed(ctx, version, conn, fsys)
}
func resetDatabase15(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
if err := utils.Docker.ContainerRemove(ctx, utils.DbId, container.RemoveOptions{Force: true}); err != nil {
return errors.Errorf("failed to remove container: %w", err)
}
if err := utils.Docker.VolumeRemove(ctx, utils.DbId, true); err != nil {
return errors.Errorf("failed to remove volume: %w", err)
}
config := start.NewContainerConfig()
hostConfig := start.NewHostConfig()
networkingConfig := network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
utils.NetId: {
Aliases: utils.DbAliases,
},
},
}
fmt.Fprintln(os.Stderr, "Recreating database...")
if _, err := utils.DockerStart(ctx, config, hostConfig, networkingConfig, utils.DbId); err != nil {
return err
}
if err := start.WaitForHealthyService(ctx, start.HealthTimeout, utils.DbId); err != nil {
return err
}
if err := start.SetupLocalDatabase(ctx, version, fsys, os.Stderr, options...); err != nil {
return err
}
fmt.Fprintln(os.Stderr, "Restarting containers...")
return restartServices(ctx)
}
func initDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{User: "supabase_admin"}, options...)
if err != nil {
return err
}
defer conn.Close(context.Background())
return start.InitSchema14(ctx, conn)
}
// Recreate postgres database by connecting to template1
func recreateDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{User: "supabase_admin", Database: "template1"}, options...)
if err != nil {
return err
}
defer conn.Close(context.Background())
if err := DisconnectClients(ctx, conn); err != nil {
return err
}
// We are not dropping roles here because they are cluster level entities. Use stop && start instead.
sql := migration.MigrationFile{
Statements: []string{
"DROP DATABASE IF EXISTS postgres WITH (FORCE)",
"CREATE DATABASE postgres WITH OWNER postgres",
"DROP DATABASE IF EXISTS _supabase WITH (FORCE)",
"CREATE DATABASE _supabase WITH OWNER postgres",
},
}
return sql.ExecBatch(ctx, conn)
}
const (
TERMINATE_BACKENDS = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname IN ('postgres', '_supabase')"
COUNT_REPLICATION_SLOTS = "SELECT COUNT(*) FROM pg_replication_slots WHERE database IN ('postgres', '_supabase')"
)
func DisconnectClients(ctx context.Context, conn *pgx.Conn) error {
// Must be executed separately because looping in transaction is unsupported
// https://dba.stackexchange.com/a/11895
disconn := migration.MigrationFile{
Statements: []string{
"ALTER DATABASE postgres ALLOW_CONNECTIONS false",
"ALTER DATABASE _supabase ALLOW_CONNECTIONS false",
TERMINATE_BACKENDS,
},
}
if err := disconn.ExecBatch(ctx, conn); err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code != pgerrcode.InvalidCatalogName {
return errors.Errorf("failed to disconnect clients: %w", err)
}
}
// Wait for WAL senders to drop their replication slots
policy := start.NewBackoffPolicy(ctx, 10*time.Second)
waitForDrop := func() error {
var count int
if err := conn.QueryRow(ctx, COUNT_REPLICATION_SLOTS).Scan(&count); err != nil {
err = errors.Errorf("failed to count replication slots: %w", err)
return &backoff.PermanentError{Err: err}
} else if count > 0 {
return errors.Errorf("replication slots still active: %d", count)
}
return nil
}
return backoff.Retry(waitForDrop, policy)
}
func RestartDatabase(ctx context.Context, w io.Writer) error {
fmt.Fprintln(w, "Restarting containers...")
// Some extensions must be manually restarted after pg_terminate_backend
// Ref: https://github.com/citusdata/pg_cron/issues/99
if err := utils.Docker.ContainerRestart(ctx, utils.DbId, container.StopOptions{}); err != nil {
return errors.Errorf("failed to restart container: %w", err)
}
if err := start.WaitForHealthyService(ctx, start.HealthTimeout, utils.DbId); err != nil {
return err
}
return restartServices(ctx)
}
func restartServices(ctx context.Context) error {
// No need to restart PostgREST because it automatically reconnects and listens for schema changes
services := listServicesToRestart()
result := utils.WaitAll(services, func(id string) error {
if err := utils.Docker.ContainerRestart(ctx, id, container.StopOptions{}); err != nil && !errdefs.IsNotFound(err) {
return errors.Errorf("failed to restart %s: %w", id, err)
}
return nil
})
// Do not wait for service healthy as those services may be excluded from starting
return errors.Join(result...)
}
func listServicesToRestart() []string {
return []string{utils.StorageId, utils.GotrueId, utils.RealtimeId, utils.PoolerId}
}
func resetRemote(ctx context.Context, version string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
fmt.Fprintln(os.Stderr, "Resetting remote database"+toLogMessage(version))
conn, err := utils.ConnectByConfigStream(ctx, config, io.Discard, options...)
if err != nil {
return err
}
defer conn.Close(context.Background())
if err := migration.DropUserSchemas(ctx, conn); err != nil {
return err
}
if err := vault.UpsertVaultSecrets(ctx, utils.Config.Db.Vault, conn); err != nil {
return err
}
return apply.MigrateAndSeed(ctx, version, conn, fsys)
}
func LikeEscapeSchema(schemas []string) (result []string) {
// Treat _ as literal, * as any character
replacer := strings.NewReplacer("_", `\_`, "*", "%")
for _, sch := range schemas {
result = append(result, replacer.Replace(sch))
}
return result
}