-
Notifications
You must be signed in to change notification settings - Fork 258
/
Copy pathsqlite.go
140 lines (126 loc) · 3.74 KB
/
sqlite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//go:build cgo
// +build cgo
package sqlite
import (
"context"
"database/sql"
"fmt"
"os"
"time"
"github.com/k3s-io/kine/pkg/drivers/generic"
"github.com/k3s-io/kine/pkg/logstructured"
"github.com/k3s-io/kine/pkg/logstructured/sqllog"
"github.com/k3s-io/kine/pkg/server"
"github.com/k3s-io/kine/pkg/util"
"github.com/mattn/go-sqlite3"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
// sqlite db driver
_ "github.com/mattn/go-sqlite3"
)
var (
schema = []string{
`CREATE TABLE IF NOT EXISTS kine
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name INTEGER,
created INTEGER,
deleted INTEGER,
create_revision INTEGER,
prev_revision INTEGER,
lease INTEGER,
value BLOB,
old_value BLOB
)`,
`CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`,
`CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`,
`CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`,
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
`PRAGMA wal_checkpoint(TRUNCATE)`,
}
)
func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig, metricsRegisterer)
return backend, err
}
func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig, metricsRegisterer prometheus.Registerer) (server.Backend, *generic.Generic, error) {
if dataSourceName == "" {
if err := os.MkdirAll("./db", 0700); err != nil {
return nil, nil, err
}
dataSourceName = "./db/state.db?_journal=WAL&cache=shared"
}
dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false, metricsRegisterer)
if err != nil {
return nil, nil, err
}
dialect.LastInsertID = true
dialect.GetSizeSQL = `SELECT SUM(pgsize) FROM dbstat`
dialect.CompactSQL = `
DELETE FROM kine AS kv
WHERE
kv.id IN (
SELECT kp.prev_revision AS id
FROM kine AS kp
WHERE
kp.name != 'compact_rev_key' AND
kp.prev_revision != 0 AND
kp.id <= ?
UNION
SELECT kd.id AS id
FROM kine AS kd
WHERE
kd.deleted != 0 AND
kd.id <= ?
)`
dialect.PostCompactSQL = `PRAGMA wal_checkpoint(FULL)`
dialect.TranslateErr = func(err error) error {
if err, ok := err.(sqlite3.Error); ok && err.ExtendedCode == sqlite3.ErrConstraintUnique {
return server.ErrKeyExists
}
return err
}
dialect.ErrCode = func(err error) string {
if err == nil {
return ""
}
if err, ok := err.(sqlite3.Error); ok {
return fmt.Sprint(err.ExtendedCode)
}
return err.Error()
}
// this is the first SQL that will be executed on a new DB conn so
// loop on failure here because in the case of dqlite it could still be initializing
for i := 0; i < 300; i++ {
err = setup(dialect.DB)
if err == nil {
break
}
logrus.Errorf("failed to setup db: %v", err)
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-time.After(time.Second):
}
time.Sleep(time.Second)
}
if err != nil {
return nil, nil, errors.Wrap(err, "setup db")
}
dialect.Migrate(context.Background())
return logstructured.New(sqllog.New(dialect)), dialect, nil
}
func setup(db *sql.DB) error {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
return err
}
}
logrus.Infof("Database tables and indexes are up to date")
return nil
}