-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnamespace.go
386 lines (329 loc) · 10 KB
/
namespace.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
package turbopg
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/golang-migrate/migrate/v4"
)
// IndexConfig holds configuration for the vector similarity index
type IndexConfig struct {
// Type of distance metric to use for vector similarity
// Must be one of: "cosine_distance" or "euclidean_squared"
DistanceMetric string
// Number of IVF lists for approximate search
// Higher values = faster search, less accurate
// Lower values = slower search, more accurate
Lists int
}
// NewDefaultIndexConfig returns the default index configuration
func NewDefaultIndexConfig() *IndexConfig {
return &IndexConfig{
DistanceMetric: "cosine_distance",
Lists: 100, // TurboPuffer default
}
}
// CreateNamespaceOptions holds options for namespace creation
type CreateNamespaceOptions struct {
// Required: number of dimensions for vectors in this namespace
Dimensions int
// Optional: index configuration
IndexConfig *IndexConfig
}
// ListNamespacesOptions holds options for listing namespaces
type ListNamespacesOptions struct {
// Optional prefix to filter namespaces by
Prefix string
// Maximum number of namespaces to return
Limit int
}
// ListNamespacesResponse holds the response from ListNamespaces
type ListNamespacesResponse struct {
// List of namespaces
Namespaces []string
// Total number of namespaces (ignoring limit)
Total int
}
// Namespace represents a namespace and its configuration
type Namespace struct {
// Name of the namespace
Name string
// Number of dimensions in vectors
Dimensions int
// Index configuration
IndexConfig *IndexConfig
}
// CreateNamespace creates a new namespace with the given configuration
func (s *Store) CreateNamespace(ctx context.Context, namespace string, opts CreateNamespaceOptions) error {
// Validate namespace name
if err := ValidateNamespace(namespace); err != nil {
return fmt.Errorf("invalid namespace name: %w", err)
}
// Validate dimensions
if opts.Dimensions <= 0 {
return fmt.Errorf("dimensions must be positive, got %d", opts.Dimensions)
}
// Use default index config if none provided
indexConfig := opts.IndexConfig
if indexConfig == nil {
indexConfig = NewDefaultIndexConfig()
}
// Validate distance metric
switch indexConfig.DistanceMetric {
case "cosine_distance", "euclidean_squared":
// valid
default:
return fmt.Errorf("invalid distance metric: %s", indexConfig.DistanceMetric)
}
// Build table name
tableName := GetNamespaceTableName(s.prefix, namespace)
// Choose operator based on distance metric
operator := "vector_cosine_ops"
if indexConfig.DistanceMetric == "euclidean_squared" {
operator = "vector_l2_ops"
}
// Create up migration
upSQL := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id TEXT PRIMARY KEY,
vector vector(%d),
attributes JSONB
);
CREATE INDEX IF NOT EXISTS %s_vector_idx
ON %s
USING ivfflat (vector %s)
WITH (lists = %d);`,
tableName, opts.Dimensions,
tableName, tableName, operator, indexConfig.Lists)
// Create down migration
downSQL := fmt.Sprintf(`
DROP TABLE IF EXISTS %s CASCADE;`,
tableName)
// Add migration
if err := s.migrator.Append(ctx, fmt.Sprintf("create_%s", namespace), upSQL, downSQL); err != nil {
return fmt.Errorf("add migration: %w", err)
}
// Run migration
if err := s.migrator.Up(ctx); err != nil {
s.logger.Info("migration result",
Field{Key: "error", Value: err.Error()},
Field{Key: "is_no_change", Value: err == migrate.ErrNoChange},
)
if err != migrate.ErrNoChange {
return fmt.Errorf("run migration: %w", err)
}
}
// Verify table exists
var exists bool
query := `
SELECT EXISTS (
SELECT FROM pg_tables
WHERE schemaname = 'public'
AND tablename = $1
)`
err := s.db.QueryRowContext(ctx, query, tableName).Scan(&exists)
if err != nil {
return fmt.Errorf("check table existence: %w", err)
}
if !exists {
s.logger.Info("table not found",
Field{Key: "table", Value: tableName},
Field{Key: "sql", Value: upSQL},
)
return fmt.Errorf("table %s was not created", tableName)
}
// Write metadata to system table
sysTable := GetSystemTableName(s.prefix, "namespaces")
indexConfigJSON, err := json.Marshal(indexConfig)
if err != nil {
return fmt.Errorf("marshal index config: %w", err)
}
query = fmt.Sprintf(`
INSERT INTO %s (namespace, dimensions, index_config, created_at, updated_at)
VALUES ($1, $2, $3, NOW(), NOW())
ON CONFLICT (namespace) DO UPDATE SET
dimensions = $2,
index_config = $3,
updated_at = NOW()`,
sysTable)
_, err = s.db.ExecContext(ctx, query, namespace, opts.Dimensions, indexConfigJSON)
if err != nil {
return fmt.Errorf("write metadata: %w", err)
}
s.logger.Info("created namespace",
Field{Key: "namespace", Value: namespace},
Field{Key: "dimensions", Value: opts.Dimensions},
Field{Key: "distance_metric", Value: indexConfig.DistanceMetric},
)
return nil
}
// DeleteNamespace deletes a namespace and all its data.
// This operation is idempotent - deleting a non-existent namespace is not an error.
func (s *Store) DeleteNamespace(ctx context.Context, namespace string) error {
// Validate namespace name
if err := ValidateNamespace(namespace); err != nil {
return fmt.Errorf("invalid namespace name: %w", err)
}
// Build table name
tableName := GetNamespaceTableName(s.prefix, namespace)
// For deletion, we first check if the table exists
var exists bool
query := `
SELECT EXISTS (
SELECT FROM pg_tables
WHERE schemaname = 'public'
AND tablename = $1
)`
err := s.db.QueryRowContext(ctx, query, tableName).Scan(&exists)
if err != nil {
return fmt.Errorf("check table existence: %w", err)
}
// If table doesn't exist, we're done (idempotent)
if !exists {
s.logger.Info("namespace does not exist, skipping deletion",
Field{Key: "namespace", Value: namespace},
)
return nil
}
// Create up migration (delete)
upSQL := fmt.Sprintf(`
DROP TABLE IF EXISTS %s CASCADE;`,
tableName)
// Create down migration (recreate)
// Note: We can't fully recreate the table since we don't store the dimensions and index config
// This is a limitation - the down migration will need manual adjustment if needed
downSQL := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id TEXT PRIMARY KEY,
vector vector(1), -- placeholder dimension
attributes JSONB
);`,
tableName)
// Add migration
if err := s.migrator.Append(ctx, fmt.Sprintf("delete_%s", namespace), upSQL, downSQL); err != nil {
return fmt.Errorf("add migration: %w", err)
}
// Run migration
if err := s.migrator.Up(ctx); err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("run migration: %w", err)
}
// Delete metadata from system table
sysTable := GetSystemTableName(s.prefix, "namespaces")
query = fmt.Sprintf(`
DELETE FROM %s
WHERE namespace = $1`,
sysTable)
_, err = s.db.ExecContext(ctx, query, namespace)
if err != nil {
return fmt.Errorf("delete metadata: %w", err)
}
s.logger.Info("deleted namespace",
Field{Key: "namespace", Value: namespace},
)
return nil
}
// ListNamespaces lists all namespaces in the store
func (s *Store) ListNamespaces(ctx context.Context, opts ListNamespacesOptions) (*ListNamespacesResponse, error) {
// Build query to list namespaces from metadata table
sysTable := GetSystemTableName(s.prefix, "namespaces")
// Base query
query := fmt.Sprintf(`
SELECT namespace
FROM %s
WHERE 1=1`,
sysTable)
// Add prefix filter if specified
var args []interface{}
if opts.Prefix != "" {
args = append(args, opts.Prefix+"%")
query += fmt.Sprintf(" AND namespace LIKE $%d", len(args))
}
// Add ordering
query += " ORDER BY namespace"
// Get total count (including prefix filter)
var total int
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM (%s) AS t", query)
err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&total)
if err != nil {
return nil, fmt.Errorf("count namespaces: %w", err)
}
// Add limit if specified
if opts.Limit > 0 {
args = append(args, opts.Limit)
query += fmt.Sprintf(" LIMIT $%d", len(args))
}
// Execute query
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list namespaces: %w", err)
}
defer rows.Close()
// Extract namespace names
var namespaces []string
for rows.Next() {
var namespace string
if err := rows.Scan(&namespace); err != nil {
return nil, fmt.Errorf("scan namespace: %w", err)
}
namespaces = append(namespaces, namespace)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate namespaces: %w", err)
}
return &ListNamespacesResponse{
Namespaces: namespaces,
Total: total,
}, nil
}
// GetNamespace gets information about a specific namespace
func (s *Store) GetNamespace(ctx context.Context, namespace string) (*Namespace, error) {
// Validate namespace name
if err := ValidateNamespace(namespace); err != nil {
return nil, fmt.Errorf("invalid namespace name: %w", err)
}
// Build table name
tableName := GetNamespaceTableName(s.prefix, namespace)
// First check if the table exists
var exists bool
query := `
SELECT EXISTS (
SELECT FROM pg_tables
WHERE schemaname = 'public'
AND tablename = $1
)`
err := s.db.QueryRowContext(ctx, query, tableName).Scan(&exists)
if err != nil {
return nil, fmt.Errorf("check table existence: %w", err)
}
if !exists {
return nil, fmt.Errorf("namespace %q does not exist", namespace)
}
// Get metadata from system table
sysTable := GetSystemTableName(s.prefix, "namespaces")
var (
dimensions int
indexConfig IndexConfig
configJSON []byte
)
query = fmt.Sprintf(`
SELECT dimensions, index_config
FROM %s
WHERE namespace = $1`,
sysTable)
err = s.db.QueryRowContext(ctx, query, namespace).Scan(&dimensions, &configJSON)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("namespace %q metadata not found", namespace)
}
return nil, fmt.Errorf("get metadata: %w", err)
}
err = json.Unmarshal(configJSON, &indexConfig)
if err != nil {
return nil, fmt.Errorf("unmarshal index config: %w", err)
}
return &Namespace{
Name: namespace,
Dimensions: dimensions,
IndexConfig: &indexConfig,
}, nil
}